Integrating Apache Nifi with IBM MQ

By | May 10, 2018

Integrating Apache Nifi with IBM MQ

This would be a continuation of the IBM MQ and Hadoop integration article I first posted a few years ago. This explains how to integrate IBM MQ with Apache Nifi or Hortonworks HDF. IBM MQ is extremely important when attempting to integrate new technologies with legacy environments specifically mainframe environments where a lot of useful business data resides.

How to Integrate with Apache Nifi and Hadoop using IBM MQ

To integrate IBM MQ with Apache Nifi or Hortonworks HDF you will first need to download Apache Nifi or Hortonworks HDF and than you will want to download IBM MQ Advanced Developer Edition https://developer.ibm.com/messaging/mq-downloads/ which is free for developers or users to try out. So once you have Hortonworks HDF or Apache Nifi installed and configured you will proceed to install and configure IBM MQ.

In this example below I am installing and configuring a Local QMGR in Bindings Mode to interoperate with Apache Nifi’s ConsumeJMS Function and the PutHDFS function to send the messages to HDFS.

And the question will probably come up why Bindings vs using Clients with MQ. Bindings provides better performance. https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q030520_.htm and IBM even states Bindings provides about a 30% performance improvement over clients. https://www.ibm.com/support/knowledgecenter/en/SSAW57_9.0.0/com.ibm.websphere.nd.multiplatform.doc/ae/ucli_pqcfm.html and http://bjansen.github.io/java/2018/03/04/high-performance-mq-jms.html

 

 

Installing IBM MQ Local QMGR and Samples

Installing IBM MQ on Linux:
root@hdpclient tmp]# tar -xvf mqadv_dev80_linux_x86-64.tar.gz
root@hdpclient tmp]# cd MQServer
[root@hdpclient MQServer]# ./mqlicense.sh
[root@hdpclient MQServer]# yum install ./MQSeriesRuntime-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesServer-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesGSKit-8.0.0-4.x86_64.rpm MQSeriesJRE-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesJava-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesClient-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesAMQP-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesSamples-8.0.0-4.x86_64.rpm

Configuring you Local QMGR with IBM MQ 8.0+

#Switch to MQM User
su – mqm
# Source Local QMGR Environment Variables
. /opt/ibm/mqm/bin/setmqenv
# Create a Local QMGR if it doesn’t already exist
crtmqm GSSMQP1
# Start the Local QMGR if it’s not already started
strmqm GSSMQP1
# Create Local Queues if none are defined for testing
runmqsc GSSMQP1
define qlocal(GSS.REQUEST.REPLY.QUEUE)
end

Configuring IBM MQ to allow Nifi to read messages

Configuring the local QMGR to allow the “nifi” group or user-id you are using to run the local Nifi process on the same server as your IBM MQ QMGR.

#Switch to the mqm user:
su – mqm
#Source Local QMGR Environment Variables
. /opt/ibm/mqm/bin/setmqenv
#ACL the Local QMGR for Flume and the Queue(s) being used by Nifi
setmqaut -m GSSMQP1 -t qmgr -g nifi +connect +inq +dsp
setmqaut -m GSSMQP1 -t queue -n GSS.REQUEST.REPLY.QUEUE -g nifi +inq +browse +get +put +dsp

#Refresh Security on the Local QMGR
runmqsc GSSMQP1
refresh security
end

Setup Nifi Server nifi-bootstrap.conf. with the following additional java.args

java.arg.18=-Djavax.security.auth.useSubjectCredsOnly=true
+java.arg.19=-Djava.library.path=/opt/mqm/java/lib64/
+java.arg.20=-Dcom.ibm.mq.cfg.jmqi.libpath=/opt/mqm/java/lib64

A strange requirement after doing some testing disabling and re-enabling the IBMMQJMSController when in bindings mode causes a stack trace to be thrown I’ve raised the issue NIFI-5184 with the Apache Nifi Project

A solution to this problem is to force the IBM MQ libs to load with the system classloader but since Nifi uses a RunNifi class to build this classpath you must copy in the jars to nifi/lib or symlink in nifi/lib:

  • cd /usr/nifi/lib
  • MQJARS=`ls -1 /opt/mqm/java/lib/*.jar`; for mqjar in $MQJARS; do ln -fns $mqjar; done

NIFI IBM MQ Setup to take messages from MQ Queue and Put to HDFS:
1) Create a ConsumeJMS Processor and setting it up to consume IBM MQ with a Local QMGR using MQ Bindings:

  • Create a new Controller Service using JMSConnectionFactoryProvider from within the ConsumeJMS Processor
    • Settings “JMSConnectionFactoryProvider controller service:
      • Name -> IBMMQConnectionFactory
    • Properties IBMMQConnectionFactory controller service:
      • MQ ConnectionFactory Implementation -> com.ibm.mq.jms.MQConnectionFactory
      • MQ Client Libraries path (i.e., /usr/jms/lib) –> /opt/mqm/java/lib/
      • Broker URI -> mqm
      • SSL Context Service -> Not Set
      • queueManager -> GSSMQP1
      • transportType -> 0
  • Setup the ConsumeJMS Processor:
    • SettingsConsumeJMS:
      • Name Processor -> IBMMQConsumeJMS
    • Properties IBMMQConsumeJMS:
      • Connection Factory Service -> IBMMQConnectionFactory
      • Destination Name -> GSS.REQUEST.REPLY.QUEUE
      • Destination Type -> QUEUE
      • Session Cache size -> 1
      • Acknowledgement Mode -> 2

2) Create a PutHDFS Processor:

  • Settings:
    • Name Processor -> JMSPut2HDFS
    • Hadoop Configuration Resources -> /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
    • Kerberos Principal -> nifi/hdpclient.hdp.senia.org@HDP.SENIA.ORG
    • Kerberos Keytab -> /etc/security/keytabs/nifi.service.keytab
    • Directory -> /tmp/nifi_jms

Sending Sample Messages to the Local Queue on QMGR using Sample Put Command

  • cd /opt/mqm
  • . ./bin/setmqenv
  • ./samp/bin/amqsput GSS.REQUEST.REPLY.QUEUE GSSMQP1

    Sample AMQSPUT0 start

    target queue is GSS.REQUEST.REPLY.QUEUE

    GSS TEST END MESSAGE SEND MESSAGE

    end

    Sample AMQSPUT0 end

Screenshots showing the Nifi Configuration

Nifi IBM MQJMS to HDFS

Nifi IBM MQJMS to HDFS

Nifi Flow Controller for IBMMQConnectionFactory

Nifi Flow Controller for IBMMQConnectionFactory

Nifi IBMMQConnectionFactory Properties

Nifi IBMMQConnectionFactory Properties

Nifi ConsumeJMS IBMMQ Properties

Nifi ConsumeJMS IBMMQ Properties

Nifi PutHDFS Processor

Nifi PutHDFS Processor