One of the API Connect features is to source out API events to a 3rd party system. In today’s blog, I will offload the events to an IBM Event Streams and connect Apache Flink SQL client to analyze events in real time.
- First, add the external system configurations to
APIConnectCluster
CR.
analytics:
external:
offload:
enabled: true
output: |
kafka {
topic_id => "apic-analytics"
bootstrap_servers => "hostname:port"
codec => "json"
id => "kafka_offload"
ssl_truststore_location => "/etc/velox/external_certs/offload/truststore.p12"
ssl_keystore_location => "/etc/velox/external_certs/offload/keystore.p12"
ssl_truststore_type => "PKCS12"
ssl_keystore_type => "PKCS12"
security_protocol => "SSL"
}
secretName: offload-a7s-certificates
-
Review the
a7s-ingestion
pod’s logs, to ensure the connection is completed. -
I will use Apache Flink SQL client to analyze the incoming events.
Download Apache Flink sql-client.sh. You can download the files from here: https://flink.apache.org/downloads.
-
Obtain required dependencies:
flink-sql-connector-kafka-1.17.0.jar
andkafka-clients-3.4.0.jar
. You can download them from the maven repository. -
Run the SQL client and pass the required JAR files
flink-1.17.0/bin/sql-client.sh --jar flink-sql-connector-kafka-1.17.0.jar --jar kafka-clients-3.4.0.jar
- Flink SQL terminal will pop up, then you can start by creating the SQL table:
CREATE TEMPORARY TABLE sandboxEvents (
api_name STRING,
app_name STRING,
developer_org_name STRING,
request_method STRING,
catalog_name STRING,
org_name STRING,
status_code STRING
) WITH (
'connector' = 'kafka',
'topic' = 'apic-analytics',
'properties.group.id' = 'flink-consumer-group',
'properties.bootstrap.servers' = '<bootstrap_route>:443',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'json.ignore-parse-errors' = 'true',
'properties.security.protocol' = 'SASL_SSL',
'properties.ssl.truststore.location' = '<path>/es-cert.p12',
'properties.ssl.truststore.password' = '*****',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.ssl.protocol' = 'TLSv1.2',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="*****" password="*****";'
);
- Finally, you can display and analyze events using regular SQL queries like:
select * from sandboxEvents;