Explore Apache Flink SQL Client

Rashid · March 27, 2023

In my last blog, I used Apache Flink to act on streams in real-time using the java framework. In today’s post, I will introduce Apache Flink SQL Client.

The SQL client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.

Let’s put things into action:

  • First, I want to update the previous Streaming application to sink into TSINK topic.
KafkaSink<String> ksink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic("TSINK")
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .setProperty("security.protocol", "SASL_SSL")
        .setProperty("ssl.truststore.location", "<path>/es-cert.p12")
        .setProperty("ssl.truststore.password", "****")
        .setProperty("sasl.mechanism", "SCRAM-SHA-512")
        .setProperty("ssl.protocol", "TLSv1.2")
        .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*****\" password=\"*****")\";")
        .build();

stream.sinkTo(ksink);`
  • Then, download Apache Flink sql-client.sh. You can download the files from here: https://flink.apache.org/downloads.

  • Obtain required dependencies: flink-sql-connector-kafka-1.17.0.jar and kafka-clients-3.4.0.jar. You can download them from the maven repository.

  • Run the SQL client and pass the required JAR files

flink-1.17.0/bin/sql-client.sh --jar flink-sql-connector-kafka-1.17.0.jar --jar kafka-clients-3.4.0.jar
  • Flink SQL terminal will pop up, then you can start by creating the SQL table:
CREATE TEMPORARY TABLE flaggedEvents (
>     transaction BIGINT,
>     action STRING,
>     account STRING,
>     status STRING
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'TSINK',
>   'properties.group.id' = 'flink-consumer-group',
>   'properties.bootstrap.servers' = '<bootstrap_route>:443',
>   'format' = 'json',
>   'scan.startup.mode' = 'earliest-offset',
>   'json.ignore-parse-errors' = 'true',
>   'properties.security.protocol' = 'SASL_SSL',
>   'properties.ssl.truststore.location' = '<path>/es-cert.p12',
>   'properties.ssl.truststore.password' = '*****',
>   'properties.sasl.mechanism' = 'SCRAM-SHA-512',
>   'properties.ssl.protocol' = 'TLSv1.2',
>   'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="*****," password="*****";'
> );

  • Finally, you can display events using regular SQL queries like:
select * from flaggedEvents;