# 消息中间件相关

# ActiveMQ生产者

简述

向指定的MQ队列发送消息。
消息格式:List<Map<String, String>>

   var ActivemqOut = Java.type("com.uinnova.di.common.producer.amq.AMQProducer");
    /**
     * 初始化Activemq连接
     * @param url          Activemq地址 (tcp://0.0.0.0:61616)
     * @param username     用户名
	 * @param password     密码 
     * @param queue        队列模式 (queue or topic)
     * @param queueName    队列名称 
	 * @param deliveryMode 非持久化或持久化 (1非持久化, 2持久化, 默认1非持久化)
     * 
     */
    var out = ActivemqOut.getInstance("url", "username", "password", "queue", "queueName", 1);
    /**
     * 发送数据
     * @param sendData 需要发送的数据 (数据类型 List<Map<String,Object>>)
     */
    out.sendList(sendData);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# ActiveMQ消费者

简述

接收来自MQ的消息。

    var ActivemqReceiver = Java.type("com.uinnova.dip.javascript.amq.ActivemqReceiver");
    /**
     * @param url       Activemq地址 (tcp://0.0.0.0:61616) 
     * @param username  用户名
     * @param password  密码
     * @param queue     队列模式 (queue or topic)
     * @param queueName 队列名称
     */
    var receiver = new ActivemqReceiver("url", "username", "password", "queue", "queueName");
    startReceiver(receiver);
    while (true) {
        try {
            var sourceData = receiver.getData();
            logger.info("接收到的MQ的数据: " + sourceData);
        } catch (error) {
            logger.error("脚本执行错误:" + error.message);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18





# RabbitMQ生产者

简述

RabbitMQ生产者发送数据支持多种模式,有如下几种模式 (v5.13.0版本新增):

  • 简单队列模式
  • 订阅模式
  • 路由模式
  • 模式

⭐️ 特别注意,在使用rabbitmq发送数据时,并未限定发送数据的大小,如果发送数据较大时,可以自行限制单批次发送数据的数据量。

    var RabbitMQProducer = Java.type("com.uinnova.di.common.producer.rabbitmq.RabbitMQProducer");
    var HashMap = Java.type("java.util.HashMap");
    /**
     * 初始化Rabbitmq连接
     * @param url      Rabbitmq地址 (0.0.0.0)
     * @param port     端口号 (默认5672)
     * @param username 用户名
     * @param password 密码
     */
    var RMQ = new RabbitMQProducer("url", port, "username", "password");
    /**
     * 发送数据 (发送模式分为四种: 简单模式, 订阅模式, 路由模式, topic模式)
     * @param modelMap 模式集合 (集合类型 Map<String,String>), 不同模式传递不同参数按以下规范传参即可
     * - 简单模式  modelMap.put("model","simple")    modelMap.put("name","队列名称")
     * - 订阅模式  modelMap.put("model","subscribe") modelMap.put("name","交换机名称")
     * - 路由模式  modelMap.put("model","routing")   modelMap.put("name","交换机名称") modelMap.put("routingKey","路由键")
     * - topic模式 modelMap.put("model","topic")     modelMap.put("name","交换机名称") modelMap.put("routingKey","路由键")
     * @param durable    交换机是否持久化 (true:持久化, false:不持久化) || 队列是否持久化 (true:持久化, false:不持久化) (简单模式)
     * @param autoDelete 无服务使用时是否自动删除交换机 (true:删除, false:不删除) || 无服务使用时是否自动删除队列 (true:删除, false:不删除) (简单模式)
     * @param sendData   需要发送的数据 (数据类型 Object)
     */
    var modelMap = new HashMap();
    RMQ.sendByModel(modelMap, false, false, "test sendData");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# RabbitMQ消费者

简述

RabbitMQ消费者与生产者一样支持多种模式,有如下几种模式(v5.13.0版本新增):

  • queue简单队列模式
  • subscribe订阅模式
  • routing路由模式
  • topic模式

# 简单模式

    var RabbitMqReceiver = Java.type("com.uinnova.dip.rabbitmq.RabbitMqReceiver");
    /**
     * 初始化连接
     * @param url         Rabbitmq地址 (0.0.0.0)
     * @param port        端口号 (默认是5672) 
     * @param username    用户名
     * @param password    密码
     * @param queueName   队列名称
     * @param durable     队列是否持久化 (true:持久化, false:不持久化)
     * @param autoDelete  当连接队列的消费者与队列断开连接时, 是否自动删除对列 (true:删除, false:不删除)
     */
    var receiver = RabbitMqReceiver.getInstance("url", port, "username", "password", "queueName", false, false);

    // 简单模式接收消息
    startReceiver(receiver);
    while (true) {
        var data = receiver.getData();
        if (data != null) {
            logger.info("从Rabbitmq接收到的消息是:" + data);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 订阅模式

    var RabbitMqReceiver = Java.type("com.uinnova.dip.rabbitmq.RabbitMqReceiver");
    /**
     * 初始化连接
     * @param url          Rabbitmq地址 (0.0.0.0)
     * @param port         端口号 (默认是5672)
     * @param username     用户名
     * @param password     密码
     * @param queueName    队列名称
     * @param durable      队列是否持久化 (true:持久化, false:不持久化)
     * @param autoDelete   当连接队列的消费者与队列断开连接时, 是否自动删除对列 (true:删除, false:不删除)
     * @param exchangeName 交换机名称
     */
    var receiver = RabbitMqReceiver.getInstance("url", port, "username", "password", "queueName", false, false, "exchangeName");

    // 订阅模式接收消息
    startReceiver(receiver);
    while (true) {
        var data = receiver.getData();
        if (data != null) {
            logger.info("从Rabbitmq接收到的消息是:" + data);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 路由模式

    var RabbitMqReceiver = Java.type("com.uinnova.dip.rabbitmq.RabbitMqReceiver");
    /**
     * 初始化连接
     * @param url          Rabbitmq地址 (0.0.0.0)
     * @param port         端口号 (默认是5672)
     * @param username     用户名
     * @param password     密码
     * @param queueName    队列名称
     * @param durable      队列是否持久化 (true:持久化, false:不持久化)
     * @param autoDelete   当连接队列的消费者与队列断开连接时, 是否自动删除对列 (true:删除, false:不删除)
     * @param exchangeName 交换机名称
     * @param routingKey   路由键
     * @param routingMode  路由模式 (direct 直接路由)   
     */
    var receiver = RabbitMqReceiver.getInstance("url", port, "username", "password", "queueName", false, false, "exchangeName", "routingKey", "direct");

    // 路由模式接收消息
    startReceiver(receiver);
    while (true) {
        var data = receiver.getData();
        if (data != null) {
            logger.info("从Rabbitmq接收到的消息是:" + data);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# Topic模式

    var RabbitMqReceiver = Java.type("com.uinnova.dip.rabbitmq.RabbitMqReceiver");
    /**
     * 初始化连接
     * @param url          Rabbitmq地址 (0.0.0.0)
     * @param port         端口号 (默认是5672)
     * @param username     用户名
     * @param password     密码
     * @param queueName    队列名称
     * @param durable      队列是否持久化 (true:持久化, false:不持久化)
     * @param autoDelete   当连接队列的消费者与队列断开连接时, 是否自动删除对列 (true:删除, false:不删除)
     * @param exchangeName 交换机名称
     * @param routingKey   路由键
     * @param routingMode  路由模式 (topic 通配符)   
     */
    var receiver = RabbitMqReceiver.getInstance("url", port, "username", "password", "queueName", false, false, "exchangeName", "routingKey", "topic");

    // Topic模式接收消息
    startReceiver(receiver);
    while (true) {
        var data = receiver.getData();
        if (data != null) {
            logger.info("从Rabbitmq接收到的消息是:" + data);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24





# Kafka生产者

简述

向指定topic发送消息,分为4种方式:无认证方式,sasl-plain认证方式, sasl-scram认证方式,ssl认证方式。

    var KafkaUtil = Java.type("com.uinnova.di.common.producer.kafka.DIKafkaProducer");
    /**
     * 初始化Kafka连接无认证
     * @param hosts        Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
     * @param messageTopic 消息主题
     */
    var kafka = KafkaUtil.getInstance("hosts", "messageTopic");

    /**
     * 初始化Kafka连接 sasl-plain认证
     * @param hosts        Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
     * @param messageTopic 消息主题
     * @param saslConf     sasl-plain认证文件地址
     * @param delay        最晚提交时间
     */ 
    var kafka = KafkaUtil.getInstance("hosts", "messageTopic", "saslConf", 1);

    /**
     * 初始化Kafka连接 sasl-scram认证
     * @param hosts        Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
     * @param messageTopic 消息主题
     * @param username     sasl-scram 认证用户名
     * @param password     sasl-scram 认证密码
     * @param scramType    密码加密算法 选择 SCRAM-SHA-256 或 SCRAM-SHA-512
     * @param delay        最晚提交时间
     */
    var kafka = KafkaUtil.getInstance("hosts", "messageTopic", "username","password","scramType", 1);

    /**
     * 初始化Kafka连接 ssl认证
     * @param hosts        Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
     * @param messageTopic 消息主题
     * @param sslMap ssl认证配置项
     * @param delay
     *
     */
    var sslMap = new Java.util.HashMap();
    sslMap.put("ssl.protocol", "");
    sslMap.put("security.protocol", "");
    sslMap.put("ssl.keystore.location", "");
    sslMap.put("ssl.keystore.password","");
    sslMap.put("ssl.key.password", "");
    sslMap.put("ssl.truststore.location", "");
    sslMap.put("ssl.truststore.password", "");
    sslMap.put("ssl.truststore.type", "");
    sslMap.put("ssl.keystore.type", "");
    sslMap.put("ssl.endpoint.identification.algorithm", "");
    var kafka = KafkaUtil.getInstance("hosts", "messageTopic", sslMap, 1);
    
    /**
     * 发送数据
     * @param sendData 需要发送的数据 (数据类型 List<Map<String,Object>>)
     */
    kafka.sendList(sendData);   
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

# Kafka消费者

简述

接收来自kafka消息, 分为4种方式:无认证方式,sasl-plain认证方式, sasl-scram认证方式,ssl认证方式。

    var KafkaReceiver = Java.type("com.uinnova.dip.javascript.kafka.KafkaReceiver");
    // kafka对String的反序列化方式
    var keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
    var valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";

    /**
     * @param hosts             Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
     * @param messageTopic      消息主题
     * @param groupId           分组ID 
     * @param keyDeserializer   kafka对String的反序列化方式        
     * @param valueDeserializer kafka对String的反序列化方式   
     * @param timeOut           连接Kafka服务超时时间 (单位: 毫秒(ms), 默认200毫秒)
     */
    var kafkaReceiver = new KafkaReceiver("hosts", "messageTopic", "groupId", keyDeserializer, valueDeserializer, 200);

     /**
     * @param hosts             Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
     * @param messageTopic      消息主题
     * @param saslConf          sasl-plain认证文件地址
     * @param groupId           分组ID
     * @param keyDeserializer   kafka对String的反序列化方式        
     * @param valueDeserializer kafka对String的反序列化方式   
     * @param timeOut           连接Kafka服务超时时间 (单位: 毫秒(ms), 默认200毫秒)
     */
    var kafkaReceiver = new KafkaReceiver("hosts", "messageTopic","saslConf", "groupId", keyDeserializer, valueDeserializer, 200);

   /**
     * @param hosts             Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
     * @param messageTopic      消息主题
     * @param username          sasl-scram 认证用户名
     * @param password          sasl-scram 认证密码
     * @param scramType         密码加密类型,选择 SCRAM-SHA-256 或 SCRAM-SHA-512
     * @param groupId           分组ID
     * @param keyDeserializer   kafka对String的反序列化方式        
     * @param valueDeserializer kafka对String的反序列化方式   
     * @param timeOut           连接Kafka服务超时时间 (单位: 毫秒(ms), 默认200毫秒)
     */
    var kafkaReceiver = new KafkaReceiver("hosts", "messageTopic","username","password", "scramType","groupId", keyDeserializer, valueDeserializer, 200);


    /**
     * @param hosts             Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
     * @param messageTopic      消息主题
     * @param sslMap            ssl认证配置项
     * @param groupId           分组ID
     * @param keyDeserializer   kafka对String的反序列化方式        
     * @param valueDeserializer kafka对String的反序列化方式   
     * @param timeOut           连接Kafka服务超时时间 (单位: 毫秒(ms), 默认200毫秒)
     */
    var sslMap = new Java.util.HashMap();
    sslMap.put("ssl.protocol", "");
    sslMap.put("security.protocol", "");
    sslMap.put("ssl.keystore.location", "");
    sslMap.put("ssl.keystore.password","");
    sslMap.put("ssl.key.password", "");
    sslMap.put("ssl.truststore.location", "");
    sslMap.put("ssl.truststore.password", "");
    sslMap.put("ssl.truststore.type", "");
    sslMap.put("ssl.keystore.type", "");
    sslMap.put("ssl.endpoint.identification.algorithm", "");
    var kafkaReceiver = new KafkaReceiver("hosts", "messageTopic", sslMap, "groupId", keyDeserializer, valueDeserializer, 200);

    startReceiver(kafkaReceiver);
    while (true) {
        try {
            var data = kafkaReceiver.getData();
            logger.info("接收到的Kafka数据为:" + data);
        } catch (error) {
            logger.error("脚本执行错误:" + error.message);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71





# MQTT生产者

简述

向指定MQTT Topic发送消息。

    var MQTTSender = Java.type("com.uinnova.di.common.producer.mqtt.MQTTProducer");
    /**
     * 初始化Mqtt连接
     * @param url      Mqtt地址 (tcp://0.0.0.0:1883)
     * @param clientID 客户端ID
     * @param username 用户名 (若无用户名填写"")
     * @param password 密码 (若无密码填写"")
     */
    var mqttSender = MQTTSender.getInstance("url", "clientID", "username", "password");
    try {
        /**
         * 消息发送
         * @param messageTopic 消息主题
         * @param message      需要发送的消息 (String类型)
         * @param QoS          服务质量 (可填 0:最多分发一次, 1:至少分发一次, 2:仅分发一次, 默认填写2)
         * @param charset      字符编码集 (默认填写utf-8)
         */
        var messageSend = mqttSender.publishMessage("messageTopic", "message", 2, "utf-8");
        if (messageSend) {
            heartBeat.addOutCount(1);
            logger.info("消息发送成功!");
        } else {
            logger.error("消息发送失败!");
        }
    } catch (error) {
        logger.error("脚本执行错误:" + error.message);
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# MQTT消费者

简述

接收MQTT消息。

   var MQTTReceiver = Java.type("com.uinnova.dip.javascript.mqtt.DixMqttClient");
    /**
     * 初始化Mqtt消费者
     * @param url          Mqtt地址 (tcp://0.0.0.0:1883)
     * @param clientID     客户端ID
     * @param username     用户名 (若无用户名填写"")
     * @param password     密码 (若无密码填写"")
     * @param messageTopic 消息主题
     * @param QoS          服务质量 (可填 0:最多分发一次, 1:至少分发一次, 2:仅分发一次, 默认填写2)
     * @param charset      字符编码集 (默认填写utf-8)
     */
    var receiver = MQTTReceiver.getInstance("url", "clientID", "username", "password", "messageTopic", 2, "utf-8");
    startReceiver(receiver);
    while (true) {
        try {
            var data = receiver.getData();
            if (data != null) {
                heartBeat.addInCount(1);
                var message = JSON.parse(data);
                logger.info("接收到Mqtt的数据为:" + message.recData + ",消息主题:" + message.fromTopic + ",服务质量:" + message.qos);
            }
        } catch (error) {
            logger.error("脚本执行错误:" + error.message);
        }
    }  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25




# RocketMQ生产者

简述

向RocketMQ发送数据,支持同步发送,异步发送,单向发送三种模式,不同的方式应对不同的业务场景。

  • 同步发送:每发送一次数据需要等待MQ响应结果,才会发送第二次数据。
  • 异步发送:发送和响应为异步。
  • 单向发送:仅发送数据,无返回结果。

备注:开发时采用Rocket最新版本,版本号v4.7.1。

    var RocketMQProducer = Java.type("com.uinnova.di.common.producer.rocketmq.RocketMQProducer");
    /**
     * @param nameSrvAddr nameserver地址 (192.168.1.162:9876;192.168.1.161:9876) 多个分号隔开
     * @param instanceName 生产者实例名称
     * @param producerGroup 生产者组别
     * @param topic 主题名称
     * @param tags 消息标签
     * @param sendTimeOut 超时时间 默认30000毫秒
     * @param charset 编码集 默认UTF-8
     * */
    var producer = RocketMQProducer.getInstance("0.0.0.0:0000", "instanceName", "producerGroup", "topicA", "tagA", "30000", "UTF-8");

    // 同步发送
    producer.send("hello dix");
    // 异步发送
    producer.sendCallBack("hello dix");
    // 单向发送
    producer.sendOneWay("hello dix");

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# RocketMQ消费者

简述

接收RocketMQ数据(PUSH模式),消费模式支持广播和集群模式,消费方式支持并发和有序消费。

备注:开发时采用Rocket最新版本,版本号v4.7.1。

    var RocketMqReceiver = Java.type("com.uinnova.dip.javascript.rocketmq.RocketMqReceiver");
    /**
     * @param url              rocketmq服务地址 (0.0.0.0:9876;0.0.0.0:9876) 多个地址之间用分号隔开
     * @param instanceName     消费者实例名称
     * @param consumerGroup    消费者组别
     * @param topic            主题名称
     * @param tag              消息标签  tagA || tagB  多个标签用 || 隔开
     * @param consumeFromWhere 中断后重新消费位置,参数可填 first, last, timestamp 默认first
     * @param messageModel     消费模式(指同一组内消费者),参数可填 cluster, broadcas 默认集群消费
     * @param consumeMethod    消费方式, 参数可填 concurrently, orderly 默认并发消费
     * @param charset          编码集 默认UTF-8
     */
    var receiver = RocketMqReceiver.getInstance("url", "instanceName", "consumerGroup", "topicA", "tagA || tagB", "first", "cluster", "concurrently", "UTF-8");
    startReceiver(receiver);

    while (true) {
        try {
            var data = receiver.getData();
            if (data!=null) {
                logger.info("接收到的MQ的数据:" + data);
            }
        } catch (error) {
            logger.error("脚本执行错误:" + error.message);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25




# RomaMQ生产者

简述

RomaMQ是华为基于RocketMQ v4.2.0封装的消息传输组件。

备注:由于同常规RocketMQ版本冲突,因此DIX正式发行版本中,无此方法,若有项目需求请联系开发者。

    var RocketMQProducer = Java.type("com.uinnova.di.common.producer.romamq.RomaMQProducer");
    /**
     * 初始化Rocketmq连接
     * @param url           rocketmq服务地址 (0.0.0.0:9876;0.0.0.0:9876) 多个地址之间用分号隔开
     * @param instanceName  生产者实例名称
     * @param producerGroup 生产者组别
     * @param topicName     主题名称
     * @param tag           消息标签
     * @param sendTimeOut   超时时间 默认30000毫秒
     * @param charset       编码集 默认UTF-8
     * */
    var producer = RocketMQProducer.getInstance("url", "instanceName", "producerGroup", "topicName", "tag", "30000", "UTF-8");
    // 同步发送
    producer.send("test synchronous send");
    // 异步发送
    producer.sendCallBack("test asynchronous send");
    // 单向发送
    producer.sendOneWay("test one way send");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# RomaMQ消费者

简述

RomaMQ是华为基于RocketMQ v4.2.0封装的消息传输组件。

备注:由于同常规RocketMQ版本冲突,因此DIX正式发行版本中,无此方法,若有项目需求请联系开发者。

function run() {
    var RomaMqReceiver = Java.type("com.uinnova.di.common.receiver.RomaMqReceiver");
    /**
     * @param nameSrvAddr      统一消息平台的服务器地址 (192.168.1.162:9876;192.168.1.161:9876) 多个分号隔开
     * @param appId            客户端账号
     * @param appSecret        消费者组别
     * @param topic            主题名称 
     * @param tags             消息标签, 填写null时, 为匹配所有
     * @param encryptTransport 是否需要加密传输 默认为null, 若需开启 填写"true"
     * @param charset          编码集 默认 UTF-8
     */
    var receiver = RomaMqReceiver.getInstance("0.0.0.0:0000", "appId", "appSecret", "topic", null, null, "UTF-8");
    startReceiver(receiver);

    while (true) {
        try {
            var data = receiver.getData();
            if (data!=null) {
                logger.info("---" + data);
            }
        } catch (error) {
            logger.error("脚本执行错误:" + error.message);
        }
    }  
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
上次更新时间: 2/13/2023, 4:50:06 PM