The objective of this post is to showcase how to:
- Source a queue of messages to a Kafka Cluster.
- Build a streaming application to act in real time.
Technology used:
- IBM MQ
- IBM Event Streams
- Apache Flink
- OpenShift Container Platform
High level solution diagram
Messages arrive to APP.QUEUE
queue via MVP.APP.SVRCONN
channel. A streaming queue APP.EVENTS.QUEUE
gets a copy of every message in the APP.QUEUE
queue.
Example of APP.QUEUE
messages:
<transaction>102</transaction>
<action>payment</action>
<account>checking</account>
<status>failed</status>
Kafka Connect defines a MQ Kafka Connector to source out messages from the streaming queue, APP.EVENTS.QUEUE
, to a Kafka topic TSOURCE
. Then, An Apache Flink job listens to TSOURCE
messages, and perform two actions:
- Pick up
failed
events. - Transform events stracture from
xml
tojson
.
Example of the Flink job output:
{
"transaction": 101,
"action": "payment",
"account": "checking",
"status": "failed"
}
Deployment Steps
MQ
- Define a configmap with MQ mqsc configurations:
cat << EOF | oc apply -f -
kind: ConfigMap
apiVersion: v1
metadata:
name: mqsc-setting
data:
mq.mqsc: >
DEFINE CHANNEL(MVP.APP.SVRCONN) CHLTYPE(SVRCONN)
SET CHLAUTH(MVP.APP.SVRCONN) TYPE(BLOCKUSER) USERLIST('nobody')
SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS)
SET CHLAUTH(MVP.APP.SVRCONN) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED)
ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES)
REFRESH SECURITY TYPE(CONNAUTH)
DEFINE QLOCAL(APP.EVENTS.QUEUE) MAXDEPTH(100) REPLACE
DEFINE QLOCAL(APP.QUEUE) STRMQOS(MUSTDUP) STREAMQ(APP.EVENTS.QUEUE) MAXDEPTH(100) REPLACE
SET AUTHREC OBJTYPE(QMGR) PRINCIPAL('1001') AUTHADD(CONNECT,INQ)
SET AUTHREC PROFILE(APP.EVENTS.QUEUE) OBJTYPE(QUEUE) PRINCIPAL('1001') AUTHADD(ALLMQI)
EOF
- Build a QueueManager custom resource instance:
cat << EOF | oc apply -f -
apiVersion: mq.ibm.com/v1beta1
kind: QueueManager
metadata:
name: qm1
spec:
license:
accept: true
license: L-RJON-CJR2RX
use: NonProduction
queueManager:
debug: true
mqsc:
- configMap:
items:
- mq.mqsc
name: mqsc-setting
name: QM1
resources:
limits:
cpu: 500m
requests:
cpu: 500m
storage:
queueManager:
type: ephemeral
template:
pod:
containers:
- env:
- name: MQSNOAUT
value: 'yes'
name: qmgr
version: 9.3.2.0-r1
web:
enabled: true
EOF
Event Streams
cat << EOF | oc apply -f -
apiVersion: eventstreams.ibm.com/v1beta2
kind: EventStreams
metadata:
name: es-development
spec:
license:
accept: true
use: CloudPakForIntegrationNonProduction
requestIbmServices:
iam: true
monitoring: true
strimziOverrides:
kafka:
authorization:
authorizerClass: com.ibm.eventstreams.runas.authorizer.RunAsAuthorizer
supportsAdminApi: true
type: custom
config:
default.replication.factor: 3
inter.broker.protocol.version: '3.4'
log.cleaner.threads: 6
min.insync.replicas: 2
num.io.threads: 24
num.network.threads: 9
num.replica.fetchers: 3
offsets.topic.replication.factor: 3
listeners:
- authentication:
type: scram-sha-512
name: external
port: 9094
tls: true
type: route
- authentication:
type: tls
name: tls
port: 9093
tls: true
type: internal
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: kafka-metrics-config.yaml
name: development-metrics-config
replicas: 3
storage:
type: ephemeral
zookeeper:
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: zookeeper-metrics-config.yaml
name: development-metrics-config
replicas: 3
storage:
type: ephemeral
adminUI: {}
restProducer: {}
apicurioRegistry: {}
adminApi: {}
collector: {}
version: latest
EOF
KafkaConnect
cat << EOF | oc apply -f -
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
authentication:
certificateAndKey:
certificate: user.crt
key: user.key
secretName: kafka-user
type: tls
bootstrapServers: 'bootstrapServers:port'
config:
config.storage.replication.factor: 3
config.storage.topic: connect-cluster-configs
group.id: connect-cluster
offset.storage.replication.factor: 3
offset.storage.topic: connect-cluster-offsets
status.storage.replication.factor: 3
status.storage.topic: connect-cluster-status
image: 'kafka-connect-image:tag'
replicas: 1
template:
pod:
imagePullSecrets: []
metadata:
annotations:
cloudpakId: <cloudpakId>
productVersion: 11.1.5
productID: <productID>
cloudpakName: IBM Cloud Pak for Integration
productChargedContainers: my-connect-cluster-connect
productCloudpakRatio: '2:1'
productName: IBM Event Streams for Non Production
eventstreams.production.type: CloudPakForIntegrationNonProduction
productMetric: VIRTUAL_PROCESSOR_CORE
tls:
trustedCertificates:
- certificate: ca.crt
secretName: es-development-cluster-ca-cert
EOF
Note:
- Make sure you update the following values
spec.bootstrapServers
,spec.tls
, andspec.authentication
. - Make sure you create your own container image:
spec.image
.
MQ Source Kafka Connector
cat << EOF | oc apply -f -
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
name: mq-source-connector
labels:
eventstreams.ibm.com/cluster: my-connect-cluster
spec:
class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
config:
mq.channel.name: MVP.APP.SVRCONN
mq.record.builder.key.header: JMSMessageID
value.converter: org.apache.kafka.connect.storage.StringConverter
mq.user.name: 1001
mq.queue.manager: QM1
mq.queue: APP.EVENTS.QUEUE
mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
topic: TSOURCE
key.converter: org.apache.kafka.connect.storage.StringConverter
mq.message.body.jms: true
mq.connection.name.list: qm1-address(1414)
tasksMax: 1
EOF
Note:
- You can use
mq.record.builder.key.header
key to append the event header key withJMSMessageID
. - The value can be any of the following keys:
JMSCorrelationID
,JMSCorrelationIDAsBytes
, andJMSDestination
. RecordBuilder
is a pluggable component, and you can customize to support additional requirements.
Apache Flink Job
- Define IBM Event Stream connection using KafkaSource class:
String bootstrap_route = "<bootstrap_route>:443";
KafkaSource<String> ksource = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("TSOURCE")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("security.protocol", "SASL_SSL")
.setProperty("ssl.truststore.location", "<path>/es-cert.p12")
.setProperty("ssl.truststore.password", "******")
.setProperty("sasl.mechanism", "SCRAM-SHA-512")
.setProperty("ssl.protocol", "TLSv1.2")
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*****\" password=\"*****\";")
.build();
- Define DataStream object:
DataStream<String> stream = env.fromSource(ksource, WatermarkStrategy.noWatermarks(), "Kafka Source");
- Filter events and transform them:
stream = stream.filter(s -> s.contains("failed"))
.map(s -> new toJson(s).object);
- Unpack the
toJson
class:
public toJson(String xml) {
try {
JSONObject _json = XML.toJSONObject(xml);
this.object = _json.toString();
} catch (JSONException je) {
System.out.println("Exceptions happen!\"," + je.toString());
}
}
- Print the events using PrintSink class:
PrintSink<String> sink = new PrintSink<>(true);
stream.sinkTo(sink);