How to use the Native IBM MQ Client Receiver with Spark Streaming

Technology Blog

How to use the Native IBM MQ Client Receiver with Spark Streaming

How to use the Native IBM MQ Client Receiver with Spark Streaming

After using Apache Nifi and IBM MQ I noticed that Nifi could not easily guarantee order of incoming messages as failover can occur at anytime. This becomes a problem specifically with database and table replication when the replicating software puts messages to a queue with the same timestamp. A solution to this problem is to utilize something like Spark Streaming. I decided to write a Native IBM MQ Client to access remote queues aka accessing them in a Non JMS manner. This can be important for performance reasons as described in previous articles.  This MQ Client Receiver works with IBM MQ 8.x and 9.x client libraries. The source code is kept at: https://github.com/gss2002/spark-ibm-mq

 

An example how to utilize this in a Java Spark Streaming Job:

How to use the Native IBM MQ Client Receiver

JavaStreamingContext ssc = new JavaStreamingContext(sparkconf, new Duration(1000));
JavaDStream mqStream = ssc.receiverStream(new IBMMQReciever(“mqp1.example.com”, 1414, “qmgrName”, “MQ.CLIENT.CHANNEL.NAME”, “userName”, “password”, “mqTimeWaitMS”, “keepMessagesTrue/False”));

mqStream.foreach(rdd -> {
rdd.foreach(record -> {
JSONArray json = new JSONArray(record);
String key ((JSONObject) json.get(0)).getString(“key”);
String value ((JSONObject) json.get(1)).getString(“value”);
System.out.println(“Key: “+key” :: Value:”+value);
});
});

ssc.start();
ssc.awaitTermination();

Set Spark Streaming Job with the following parameters at start time if using this to keep order from say DB2QREP or some other Queue based Data Replication Tool

spark.executor.instances=1
spark.cores.max=2
spark.streaming.receiver.maxRate=1000
spark.cleaner.ttl=120
spark.streaming.concurrentJobs=1
spark.serializer=org.apache.spark.serializer.JavaSerializer
spark.executor.cores=2
spark.hadoop.fs.hdfs.impl.disable.cache=true
spark.dynamicAllocation.enabled=false
spark.streaming.backpressure.enabled=true

Options for Enabling Write Ahead Logs (WALs)

spark.streaming.receiver.blockStoreTimeout=300
spark.streaming.receiver.writeAheadLog.enable=true
spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true

For long running Spark or Flink Jobs on YARN the following yarn-site.xml entry needs to be set:

yarn.resourcemanager.proxy-user-privileges.enabled=true

If true, ResourceManager will have proxy-user privileges. Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to do localization and log-aggregation on behalf of the user. If this is set to true, ResourceManager is able to request new hdfs delegation tokens on behalf of the user. This is needed by long-running-service, because the hdfs tokens will eventually expire and YARN requires new valid tokens to do localization and log-aggregation. Note that to enable this use case, the corresponding HDFS NameNode has to configure ResourceManager as the proxy-user so that ResourceManager can itself ask for new tokens on behalf of the user when tokens are past their max-life-time.

Tags: , , ,