Source file content to Kafka topics

Rashid · July 28, 2022

I came across recently to use-case where a customer wanted to stream metadata from a file to a Kafka topic. I found it an interesting problem because of the typical behavior of cloud-native applications with respect to states or applications with volumes - specific questions like: where will those files be stored? Dynamic options to update the files and mount them to the source components.

The following steps will solve that by using KafkaConnect and KafkaConnectors

  • Create a configMap which the data you want to source to Kafka Topic
cat <<EOF | kubectl apply -f -
  apiVersion: v1
  kind: ConfigMap
  data:
    configuration.json: |
      {"maxThreadCount":"10", "type":"GITHUB"},  
      {"maxThreadCount":"30", "type":"JSON"}  
  metadata:
    name: metadata-configuration
EOF
  • Download KafkaConnector libraries and dependencies, in my case it was Apache Kafka File Connect

  • Create custom KafkaConnect image

FROM cp.icr.io/cp/ibm-eventstreams-kafka:10.5.0
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

Place the KafkaConnector libraries and dependencies inside my-plugins folder.

  • Build, Tag, Push the container image
$ docker build -t my-connect-cluster:1.0 .
$ docker tag my-connect-cluster:1.0 quay.io/rashid_aljohani/my-connect-cluster:1.0
$ docker push quay.io/rashid_aljohani/my-connect-cluster:1.0
  • Create KafkaConnect enviroment with the custome image
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    eventstreams.ibm.com/use-connector-resources: "true"
spec:
  version: 2.8.1
  replicas: 1
  bootstrapServers: es-lite-kafka-bootstrap.cp4i.svc:9092
  image: quay.io/rashid_aljohani/my-connect-cluster:1.0
  template:
    pod:
      imagePullSecrets: []
      metadata:
        annotations:
          eventstreams.production.type: CloudPakForIntegrationNonProduction
          productID: 2a79e49111f44ec3acd89608e56138f5
          productName: IBM Event Streams for Non Production
          productVersion: 11.0.3
          productMetric: VIRTUAL_PROCESSOR_CORE
          productChargedContainers: my-connect-cluster-connect
          cloudpakId: c8b82d189e7545f0892db9ef2731b90d
          cloudpakName: IBM Cloud Pak for Integration
          cloudpakVersion: 2022.2.1
          productCloudpakRatio: "2:1"
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
  externalConfiguration:
    volumes:
      - name: metadata-configuration
        configMap:
          name: metadata-configuration

The intance is using the custom image via spec.image

The intance enables the use of KafkaConnectors via metadata.annotations.eventstreams.ibm.com/use-connector-resources: "true"

The ConfigMap is mounted via spec.externalConfiguration, files will store at /opt/kafka/external-configuration/

  • Create Kafka Connector intance, FileStream in this case, and update the configration to match your target system - such as topic name and filename
cat <<EOF | kubectl apply -f -
  apiVersion: eventstreams.ibm.com/v1beta2 
  kind: KafkaConnector 
  metadata: 
    name: filestream-source-connector 
    labels: 
      eventstreams.ibm.com/cluster: my-connect-cluster 
  spec: 
    class: FileStreamSourceConnector
    tasksMax: 1
    config:
      file: "/opt/kafka/external-configuration/<configmap-name>"
      topic: <topic-name>
EOF