Spark结构化流媒体中的MQ源 [英] MQ Source in Spark Structured Streaming

查看:26
本文介绍了Spark结构化流媒体中的MQ源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经在Spark Structure Streaming中实现了MQ源代码。我正在使用IBM MQ Core库Java 8

            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq</artifactId>
            <version>${mq.version}</version>
            <scope>provided</scope>

问题是读取操作太慢。我正在运行我的流应用程序,有4个执行器,每个执行器4 GB内存,4个内核。但我的应用程序需要10分钟才能从队列中读取1000条消息。每条消息的大小约为3kb。

从现在起,我的Spark应用程序逐个读取消息并并行处理这些消息。

有没有一种方法可以让我们同时阅读这些消息。我听说过MQ中的并发消费者。但我找不到任何有关这方面的适当文件。

有人能用Java跟我分享一些例子吗?

由于我的组织限制,我无法共享任何代码。对此我深表歉意。但我可以提供任何需要的信息来提供一些帮助。

推荐答案

但我的应用程序需要10分钟从队列中读取1000条消息。 每封邮件的大小约为3KB。

您是在与MQ队列管理器相同的服务器上运行应用程序,还是在远程服务器上运行应用程序?在循环获取下一条消息之前,您的应用程序是否正在做一些后端工作?您进行了哪些网络性能测试?可能您的网络速度很慢。

下面是一个简单的Java/MQ,它循环和检索队列上的所有消息。我只是将1000条3kb的消息放在队列中并运行它。以下是输出:

2018/06/28 17:18:21.191 MQTest12L: testReceive: successfully connected to MQWT1
2018/06/28 17:18:21.207 MQTest12L: testReceive: successfully opened TEST.Q1
2018/06/28 17:18:21.448 MQTest12L: testReceive: read 1000 messages
2018/06/28 17:18:21.448 MQTest12L: testReceive: closed: TEST.Q1
2018/06/28 17:18:21.450 MQTest12L: testReceive: disconnected from MQWT1

获取1000条3KB邮件花费了259毫秒。因此,如果您的程序需要10分钟才能收到1000条3kb的消息,则说明您的程序或您的网络有问题。

下面是一个功能齐全的Java/MQ应用程序,名为MQTest12L.java,它将循环并检索队列中的所有消息:

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;

import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;

/**
 * Program Name
 *  MQTest12L
 *
 * Description
 *  This java class will connect to a remote queue manager with the 
 *  MQ setting stored in a HashTable, loop to retrieve all messages on a queue
 *  then close and disconnect.
 *
 * Sample Command Line Parameters
 *  -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
 *
 * @author Roger Lacroix
 */
public class MQTest12L
{
   private static final SimpleDateFormat  lOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");

   private Hashtable<String,String> params;
   private Hashtable<String,Object> mqht;
   private String qMgrName;
   private String outputQName;

   /**
    * The constructor
    */
   public MQTest12L()
   {
      super();
      params = new Hashtable<String,String>();
      mqht = new Hashtable<String,Object>();
   }

   /**
    * Make sure the required parameters are present.
    * @return true/false
    */
   private boolean allParamsPresent()
   {
      boolean b = params.containsKey("-h") && params.containsKey("-p") &&
                  params.containsKey("-c") && params.containsKey("-m") &&
                  params.containsKey("-q") &&
                  params.containsKey("-u") && params.containsKey("-x");
      if (b)
      {
         try
         {
            Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            b = false;
         }
      }

      return b;
   }

   /**
    * Extract the command-line parameters and initialize the MQ HashTable.
    * @param args
    * @throws IllegalArgumentException
    */
   private void init(String[] args) throws IllegalArgumentException
   {
      int port = 1414;
      if (args.length > 0 && (args.length % 2) == 0)
      {
         for (int i = 0; i < args.length; i += 2)
         {
            params.put(args[i], args[i + 1]);
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }

      if (allParamsPresent())
      {
         qMgrName = (String) params.get("-m");
         outputQName = (String) params.get("-q");

         try
         {
            port = Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            port = 1414;
         }

         mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
         mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
         mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
         mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
         mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));

         // I don't want to see MQ exceptions at the console.
         MQException.log = null;
      }
      else
      {
         throw new IllegalArgumentException();
      }
   }

   /**
    * Connect, open queue, loop and get all messages then close queue and disconnect.
    *
    * @throws MQException
    */
   private void testReceive()
   {
      MQQueueManager qMgr = null;
      MQQueue queue = null;
      int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF + CMQC.MQOO_INQUIRE + CMQC.MQOO_FAIL_IF_QUIESCING;
      MQGetMessageOptions gmo = new MQGetMessageOptions();
      gmo.options = CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
      MQMessage receiveMsg = null;
      int msgCount = 0;
      boolean getMore = true;

      try
      {
         qMgr = new MQQueueManager(qMgrName, mqht);
         MQTest12L.logger("successfully connected to "+ qMgrName);

         queue = qMgr.accessQueue(outputQName, openOptions);
         MQTest12L.logger("successfully opened "+ outputQName);

         while(getMore)
         {
            receiveMsg = new MQMessage();

            try
            {
               // get the message on the queue
               queue.get(receiveMsg, gmo);
               msgCount++;

               if (CMQC.MQFMT_STRING.equals(receiveMsg.format))
               {
                  String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
//                  MQTest12L.logger("["+msgCount+"] " + msgStr);
               }
               else
               {
                  byte[] b = new byte[receiveMsg.getMessageLength()];
                  receiveMsg.readFully(b);
//                  MQTest12L.logger("["+msgCount+"] " + new String(b));
               }
            }
            catch (MQException e)
            {
               if ( (e.completionCode == CMQC.MQCC_FAILED) && 
                    (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) )
               {
                  // All messages read.
                  getMore = false;
                  break;
               }
               else
               {
                  MQTest12L.logger("MQException: " + e.getLocalizedMessage());
                  MQTest12L.logger("CC=" + e.completionCode + " : RC=" + e.reasonCode);
                  getMore = false;
                  break;
               }
            }
            catch (IOException e)
            {
               MQTest12L.logger("IOException:" +e.getLocalizedMessage());
            }
         }
      }
      catch (MQException e)
      {
         MQTest12L.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
      }
      finally
      {
         MQTest12L.logger("read " + msgCount + " messages");

         try
         {
            if (queue != null)
            {
               queue.close();
               MQTest12L.logger("closed: "+ outputQName);
            }
         }
         catch (MQException e)
         {
            MQTest12L.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
         try
         {
            if (qMgr != null)
            {
               qMgr.disconnect();
               MQTest12L.logger("disconnected from "+ qMgrName);
            }
         }
         catch (MQException e)
         {
            MQTest12L.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
      }
   }

   /**
    * A simple logger method
    * @param data
    */
   public static void logger(String data)
   {
      String className = Thread.currentThread().getStackTrace()[2].getClassName();

      // Remove the package info.
      if ( (className != null) && (className.lastIndexOf('.') != -1) )
         className = className.substring(className.lastIndexOf('.')+1);

      System.out.println(lOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
   }

   /**
    * main line
    * @param args
    */
   public static void main(String[] args)
   {
      MQTest12L write = new MQTest12L();

      try
      {
         write.init(args);
         write.testReceive();
      }
      catch (IllegalArgumentException e)
      {
         System.err.println("Usage: java MQTest12L -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
         System.exit(1);
      }

      System.exit(0);
   }
}

这篇关于Spark结构化流媒体中的MQ源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆