paint-brush
How to Stream XML messages from IBM MQ into Kafka into MongoDBby@rmoff
175 reads

How to Stream XML messages from IBM MQ into Kafka into MongoDB

by Robin MoffattOctober 19th, 2020
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Using Docker Compose, we can stream XML data from IBM MQ to Kafka using the IbmMQSource connector and XML Transformation. We can then ingest the data from Kafka into MongoDB using the plugin for Kafka Connect from MongoDB. This same pattern for ingesting XML will work with other connectors such as JMS and ActiveMQ. You can use ksqlDB to work with the data, to an extent - although there’s currently no support for handing the XML to kafkacat.
featured image - How to Stream XML messages from IBM MQ into Kafka into MongoDB
Robin Moffatt HackerNoon profile picture

Let’s imagine we have XML data on a queue in IBM MQ, and we want to ingest it into Kafka to then use downstream, perhaps in an application or maybe to stream to a NoSQL store like MongoDB.

Note: This same pattern for ingesting XML will work with other connectors such as JMS and ActiveMQ.

I’ve got a Docker Compose stack running that includes:

‣ IBM MQ

‣ Apache Kafka (deployed as Confluent Platform to include the all-important Schema Registry)

‣ MongoDB

Loading some test data onto IBM MQ

Let’s load some messages onto the queue from an XML file:

docker exec --interactive ibmmq \
  /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1 < data/note.xml

Streaming from IBM MQ to Kafka and translating the XML messages

Now we can ingest this into Kafka using the Kafka Connect with the IbmMQSourceConnector plugin and XML Transformation:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-ibmmq-note-01/config \
    -d '{
    "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
    "kafka.topic":"ibmmq-note-01",
    "mq.hostname":"ibmmq",
    "mq.port":"1414",
    "mq.queue.manager":"QM1",
    "mq.transport.type":"client",
    "mq.channel":"DEV.APP.SVRCONN",
    "mq.username":"app",
    "mq.password":"password123",
    "jms.destination.name":"DEV.QUEUE.1",
    "jms.destination.type":"queue",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"broker:29092",
    "confluent.topic.replication.factor":"1",
    "transforms": "extractPayload,xml",
    "transforms.extractPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractPayload.field": "text",
    "transforms.xml.type": "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value",
    "transforms.xml.schema.path": "file:///data/note.xsd",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'

NOTE:

ExtractField
 is needed otherwise the XML transform will fail with
java.lang.UnsupportedOperationException: STRUCT is not a supported type.
 since it will be trying to operate on the entire payload from IBM MQ which includes fields other than the XML that we’re interested in.

The resulting Kafka topic holds the value of the text field in the messages, serialised in Avro:

docker exec kafkacat \
    kafkacat                            \
      -b broker:29092                   \
      -r http://schema-registry:8081    \
      -s key=s -s value=avro            \
      -t ibmmq-note-01                  \
      -C -o beginning -u -q -J | \
    jq -c '.payload'

{"Note":{"to":"Tove","from":"Jani","heading":"Reminder 01","body":"Don't forget me this weekend!"}}
{"Note":{"to":"Jani","from":"Tove","heading":"Reminder 02","body":"Of course I won't!"}}
…

To understand more about the concepts around getting XML data into Kafka see here, and I’ve written about the specifics of Kafka Connect and the XML transformation here

Streaming the data from Kafka to MongoDB

We can then add another Kafka Connect connector to the pipeline, using the official plugin for Kafka Connect from MongoDB, which will stream data straight from a Kafka topic into MongoDB:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/sink-mongodb-note-01/config \
    -d '{
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics":"ibmmq-note-01",
    "connection.uri":"mongodb://mongodb:27017",
    "database":"rmoff",
    "collection":"notes",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'

Check out the data in MongoDB:

docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF

MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("9aae83c4-0e25-43a9-aca5-7278d366423b") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot 🤷‍♂️" }
bye

Let’s check that this is actually streaming, by sending another record to the MQ:

echo "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 05</heading> <body>Srsly?</body> </note>" | \
  docker exec --interactive ibmmq /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1
Sample AMQSPUT0 start
target queue is DEV.QUEUE.1
Sample AMQSPUT0 end

And, behold, the new record in MongoDB:

docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("2641e93e-9c5d-4270-8f64-e52295a60309") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot 🤷‍♂️" }
{ "_id" : ObjectId("5f77b77cee00df1cc80135a6"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 05", "body" : "Srsly?" }
bye

What if my data’s not in XML? What if we want other fields from the payload?

In the example above we’re taking data from the source system (IBM MQ) and Kafka Connect is applying a schema to the field called text within it (the XML transformation does this, based on the supplied XSD). When it’s written to Kafka it’s serialised using the selected converter which since it’s Avro stores the schema in the Schema Registry. This is a Good Way of doing things, since we retain the schema for use by any consumer. We could use Protobuf or JSON Schema here too if we wanted. If this doesn’t all make sense to you then check out Schemas, Schmeeeemas / Why not just JSON?.

But the full payload that comes through from IBM MQ looks like this:

messageID=ID:414d5120514d3120202020202020202060e67a5f06352924
messageType=text
timestamp=1601893142430
deliveryMode=1
redelivered=false
expiration=0
priority=0
properties={JMS_IBM_Format=Struct{propertyType=string,string=MQSTR   },
            JMS_IBM_PutDate=Struct{propertyType=string,string=20201005},
            JMS_IBM_Character_Set=Struct{propertyType=string,string=ISO-8859-1},
            JMSXDeliveryCount=Struct{propertyType=integer,integer=1},
            JMS_IBM_MsgType=Struct{propertyType=integer,integer=8},
            JMSXUserID=Struct{propertyType=string,string=mqm         },
            JMS_IBM_Encoding=Struct{propertyType=integer,integer=546},
            JMS_IBM_PutTime=Struct{propertyType=string,string=10190243},
            JMSXAppID=Struct{propertyType=string,string=amqsput                     },
            JMS_IBM_PutApplType=Struct{propertyType=integer,integer=6}}
text=<note> <to>Jani</to> <from>Tove</from> <heading>Reminder 02</heading> <body>Of course I won't!</body> </note>

If we want to retain some or all of these fields, we’re going to have to approach things a different way. As things stand, there is no Single Message Transform that I’m aware of that can take both the non-XML fields and the XML field and wrangle them into a single structured schema (which is the ideal outcome, or perhaps putting the non-XML fields into the Kafka message header). By default, the IBM MQ Source Connector will write the full payload to a schema. This means that you still use a schema-supporting serialisation method, but the text payload field remains unparsed.

Here’s an example:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-ibmmq-note-03/config \
    -d '{
    "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
    "kafka.topic":"ibmmq-note-03",
    "mq.hostname":"ibmmq",
    "mq.port":"1414",
    "mq.queue.manager":"QM1",
    "mq.transport.type":"client",
    "mq.channel":"DEV.APP.SVRCONN",
    "mq.username":"app",
    "mq.password":"password123",
    "jms.destination.name":"DEV.QUEUE.1",
    "jms.destination.type":"queue",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"broker:29092",
    "confluent.topic.replication.factor":"1",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'

Now the full IBM MQ message is written to a Kafka topic, serialised with a schema. We can deserialise it with something like kafkacat:

kafkacat                                \
      -b broker:29092                   \
      -r http://schema-registry:8081    \
      -s key=s -s value=avro            \
      -t ibmmq-note-03                  \
      -C -c1 -o beginning -u -q -J | \
    jq  '.'
{
  "topic": "ibmmq-note-03",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1601894073400,
  "broker": 1,
  "key": "Struct{messageID=ID:414d5120514d3120202020202020202060e67a5f033a2924}",
  "payload": {
    "messageID": "ID:414d5120514d3120202020202020202060e67a5f033a2924",
    "messageType": "text",
    "timestamp": 1601894073400,
    "deliveryMode": 1,
    "properties": {
      "JMS_IBM_Format": {
        "propertyType": "string",
        "boolean": null,
        "byte": null,
        "short": null,
        "integer": null,
        "long": null,
        "float": null,
        "double": null,
        "string": {
          "string": "MQSTR   "
        }
      },
    …
    "map": null,
    "text": {
      "string": "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>"
    }
  }
}

Observe that the text field is just a string, holding [what happens to be] XML.

You can use ksqlDB to work with the data, to an extent - although there’s currently no support for handing the XML:

SELECT "PROPERTIES"['JMSXAppID']->STRING as JMSXAppID,
       "PROPERTIES"['JMS_IBM_PutTime']->STRING as JMS_IBM_PutTime,
       "PROPERTIES"['JMSXDeliveryCount']->INTEGER as JMSXDeliveryCount,
       "PROPERTIES"['JMSXUserID']->STRING as JMSXUserID,
       text
  FROM IBMMQ_SOURCE
  EMIT CHANGES;
+-----------+-----------------+-------------------+------------+------------------------------------+
|JMSXAPPID  |JMS_IBM_PUTTIME  |JMSXDELIVERYCOUNT  |JMSXUSERID  |TEXT                                |
+-----------+-----------------+-------------------+------------+------------------------------------+
|amqsput    |10302905         |1                  |mqm         |<note> <to>Jani</to> <from>Tove</fro|
|           |                 |                   |            |m> <heading>Reminder 02</heading> <b|
|           |                 |                   |            |ody>Of course I won't!</body> </note|
|           |                 |                   |            |>                                   |
|amqsput    |10302905         |1                  |mqm         |<note> <to>Tove</to> <from>Jani</fro|
|           |                 |                   |            |m> <heading>Reminder 03</heading> <b|
|           |                 |                   |            |ody>Where are you?</body> </note>   |

👾 Try it out!

You can find the code to run this for yourself using Docker Compose on GitHub.

Also published on: https://rmoff.net/2020/10/05/streaming-xml-messages-from-ibm-mq-into-kafka-into-mongodb/