How to use the Native IBM MQ Client Receiver with Spark Streaming

By | October 14, 2018

How to use the Native IBM MQ Client Receiver with Spark Streaming

After using Apache Nifi and IBM MQ I noticed that Nifi could not easily guarantee order of incoming messages as failover can occur at anytime. This becomes a problem specifically with database and table replication when the replicating software puts messages to a queue with the same timestamp. A solution to this problem is to utilize something like Spark Streaming. I decided to write a Native IBM MQ Client to access remote queues aka accessing them in a Non JMS manner. This can be important for performance reasons as described in previous articles.  This MQ Client Receiver works with IBM MQ 8.x and 9.x client libraries. The source code is kept at: https://github.com/gss2002/spark-ibm-mq

 

An example how to utilize this in a Java Spark Streaming Job:

How to use the Native IBM MQ Client Receiver

JavaStreamingContext ssc = new JavaStreamingContext(sparkconf, new Duration(1000));
JavaDStream mqStream = ssc.receiverStream(new IBMMQReciever(“mqp1.example.com”, 1414, “qmgrName”, “MQ.CLIENT.CHANNEL.NAME”, “userName”, “password”, “mqTimeWaitMS”, “keepMessagesTrue/False”));

mqStream.foreach(rdd -> {
rdd.foreach(record -> {
JSONArray json = new JSONArray(record);
String key ((JSONObject) json.get(0)).getString(“key”);
String value ((JSONObject) json.get(1)).getString(“value”);
System.out.println(“Key: “+key” :: Value:”+value);
});
});

ssc.start();
ssc.awaitTermination();

Set Spark Streaming Job with the following parameters at start time if using this to keep order from say DB2QREP or some other Queue based Data Replication Tool

spark.executor.instances=1
spark.cores.max=1
spark.streaming.receiver.maxRate=1000
spark.cleaner.ttl=120
spark.streaming.concurrentJobs=1
spark.serializer=org.apache.spark.serializer.JavaSerializer
spark.executor.cores=1