# 消息中间件相关
# ActiveMQ生产者
简述
向指定的MQ队列发送消息。
消息格式:List<Map<String, String>>
var ActivemqOut = Java.type("com.uinnova.di.common.producer.amq.AMQProducer");
/**
* 初始化Activemq连接
* @param url Activemq地址 (tcp://0.0.0.0:61616)
* @param username 用户名
* @param password 密码
* @param queue 队列模式 (queue or topic)
* @param queueName 队列名称
* @param deliveryMode 非持久化或持久化 (1非持久化, 2持久化, 默认1非持久化)
*
*/
var out = ActivemqOut.getInstance("url", "username", "password", "queue", "queueName", 1);
/**
* 发送数据
* @param sendData 需要发送的数据 (数据类型 List<Map<String,Object>>)
*/
out.sendList(sendData);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ActiveMQ消费者
简述
接收来自MQ的消息。
var ActivemqReceiver = Java.type("com.uinnova.dip.javascript.amq.ActivemqReceiver");
/**
* @param url Activemq地址 (tcp://0.0.0.0:61616)
* @param username 用户名
* @param password 密码
* @param queue 队列模式 (queue or topic)
* @param queueName 队列名称
*/
var receiver = new ActivemqReceiver("url", "username", "password", "queue", "queueName");
startReceiver(receiver);
while (true) {
try {
var sourceData = receiver.getData();
logger.info("接收到的MQ的数据: " + sourceData);
} catch (error) {
logger.error("脚本执行错误:" + error.message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# RabbitMQ生产者
简述
RabbitMQ生产者发送数据支持多种模式,有如下几种模式 (v5.13.0
版本新增):
- 简单队列模式
- 订阅模式
- 路由模式
- 模式
⭐️ 特别注意,在使用rabbitmq发送数据时,并未限定发送数据的大小,如果发送数据较大时,可以自行限制单批次发送数据的数据量。
var RabbitMQProducer = Java.type("com.uinnova.di.common.producer.rabbitmq.RabbitMQProducer");
var HashMap = Java.type("java.util.HashMap");
/**
* 初始化Rabbitmq连接
* @param url Rabbitmq地址 (0.0.0.0)
* @param port 端口号 (默认5672)
* @param username 用户名
* @param password 密码
*/
var RMQ = new RabbitMQProducer("url", port, "username", "password");
/**
* 发送数据 (发送模式分为四种: 简单模式, 订阅模式, 路由模式, topic模式)
* @param modelMap 模式集合 (集合类型 Map<String,String>), 不同模式传递不同参数按以下规范传参即可
* - 简单模式 modelMap.put("model","simple") modelMap.put("name","队列名称")
* - 订阅模式 modelMap.put("model","subscribe") modelMap.put("name","交换机名称")
* - 路由模式 modelMap.put("model","routing") modelMap.put("name","交换机名称") modelMap.put("routingKey","路由键")
* - topic模式 modelMap.put("model","topic") modelMap.put("name","交换机名称") modelMap.put("routingKey","路由键")
* @param durable 交换机是否持久化 (true:持久化, false:不持久化) || 队列是否持久化 (true:持久化, false:不持久化) (简单模式)
* @param autoDelete 无服务使用时是否自动删除交换机 (true:删除, false:不删除) || 无服务使用时是否自动删除队列 (true:删除, false:不删除) (简单模式)
* @param sendData 需要发送的数据 (数据类型 Object)
*/
var modelMap = new HashMap();
RMQ.sendByModel(modelMap, false, false, "test sendData");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# RabbitMQ消费者
简述
RabbitMQ消费者与生产者一样支持多种模式,有如下几种模式(v5.13.0
版本新增):
- queue简单队列模式
- subscribe订阅模式
- routing路由模式
- topic模式
# 简单模式
var RabbitMqReceiver = Java.type("com.uinnova.dip.rabbitmq.RabbitMqReceiver");
/**
* 初始化连接
* @param url Rabbitmq地址 (0.0.0.0)
* @param port 端口号 (默认是5672)
* @param username 用户名
* @param password 密码
* @param queueName 队列名称
* @param durable 队列是否持久化 (true:持久化, false:不持久化)
* @param autoDelete 当连接队列的消费者与队列断开连接时, 是否自动删除对列 (true:删除, false:不删除)
*/
var receiver = RabbitMqReceiver.getInstance("url", port, "username", "password", "queueName", false, false);
// 简单模式接收消息
startReceiver(receiver);
while (true) {
var data = receiver.getData();
if (data != null) {
logger.info("从Rabbitmq接收到的消息是:" + data);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 订阅模式
var RabbitMqReceiver = Java.type("com.uinnova.dip.rabbitmq.RabbitMqReceiver");
/**
* 初始化连接
* @param url Rabbitmq地址 (0.0.0.0)
* @param port 端口号 (默认是5672)
* @param username 用户名
* @param password 密码
* @param queueName 队列名称
* @param durable 队列是否持久化 (true:持久化, false:不持久化)
* @param autoDelete 当连接队列的消费者与队列断开连接时, 是否自动删除对列 (true:删除, false:不删除)
* @param exchangeName 交换机名称
*/
var receiver = RabbitMqReceiver.getInstance("url", port, "username", "password", "queueName", false, false, "exchangeName");
// 订阅模式接收消息
startReceiver(receiver);
while (true) {
var data = receiver.getData();
if (data != null) {
logger.info("从Rabbitmq接收到的消息是:" + data);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 路由模式
var RabbitMqReceiver = Java.type("com.uinnova.dip.rabbitmq.RabbitMqReceiver");
/**
* 初始化连接
* @param url Rabbitmq地址 (0.0.0.0)
* @param port 端口号 (默认是5672)
* @param username 用户名
* @param password 密码
* @param queueName 队列名称
* @param durable 队列是否持久化 (true:持久化, false:不持久化)
* @param autoDelete 当连接队列的消费者与队列断开连接时, 是否自动删除对列 (true:删除, false:不删除)
* @param exchangeName 交换机名称
* @param routingKey 路由键
* @param routingMode 路由模式 (direct 直接路由)
*/
var receiver = RabbitMqReceiver.getInstance("url", port, "username", "password", "queueName", false, false, "exchangeName", "routingKey", "direct");
// 路由模式接收消息
startReceiver(receiver);
while (true) {
var data = receiver.getData();
if (data != null) {
logger.info("从Rabbitmq接收到的消息是:" + data);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Topic模式
var RabbitMqReceiver = Java.type("com.uinnova.dip.rabbitmq.RabbitMqReceiver");
/**
* 初始化连接
* @param url Rabbitmq地址 (0.0.0.0)
* @param port 端口号 (默认是5672)
* @param username 用户名
* @param password 密码
* @param queueName 队列名称
* @param durable 队列是否持久化 (true:持久化, false:不持久化)
* @param autoDelete 当连接队列的消费者与队列断开连接时, 是否自动删除对列 (true:删除, false:不删除)
* @param exchangeName 交换机名称
* @param routingKey 路由键
* @param routingMode 路由模式 (topic 通配符)
*/
var receiver = RabbitMqReceiver.getInstance("url", port, "username", "password", "queueName", false, false, "exchangeName", "routingKey", "topic");
// Topic模式接收消息
startReceiver(receiver);
while (true) {
var data = receiver.getData();
if (data != null) {
logger.info("从Rabbitmq接收到的消息是:" + data);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Kafka生产者
简述
向指定topic发送消息,分为4种方式:无认证方式,sasl-plain认证方式, sasl-scram认证方式,ssl认证方式。
var KafkaUtil = Java.type("com.uinnova.di.common.producer.kafka.DIKafkaProducer");
/**
* 初始化Kafka连接无认证
* @param hosts Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
* @param messageTopic 消息主题
*/
var kafka = KafkaUtil.getInstance("hosts", "messageTopic");
/**
* 初始化Kafka连接 sasl-plain认证
* @param hosts Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
* @param messageTopic 消息主题
* @param saslConf sasl-plain认证文件地址
* @param delay 最晚提交时间
*/
var kafka = KafkaUtil.getInstance("hosts", "messageTopic", "saslConf", 1);
/**
* 初始化Kafka连接 sasl-scram认证
* @param hosts Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
* @param messageTopic 消息主题
* @param username sasl-scram 认证用户名
* @param password sasl-scram 认证密码
* @param scramType 密码加密算法 选择 SCRAM-SHA-256 或 SCRAM-SHA-512
* @param delay 最晚提交时间
*/
var kafka = KafkaUtil.getInstance("hosts", "messageTopic", "username","password","scramType", 1);
/**
* 初始化Kafka连接 ssl认证
* @param hosts Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
* @param messageTopic 消息主题
* @param sslMap ssl认证配置项
* @param delay
*
*/
var sslMap = new Java.util.HashMap();
sslMap.put("ssl.protocol", "");
sslMap.put("security.protocol", "");
sslMap.put("ssl.keystore.location", "");
sslMap.put("ssl.keystore.password","");
sslMap.put("ssl.key.password", "");
sslMap.put("ssl.truststore.location", "");
sslMap.put("ssl.truststore.password", "");
sslMap.put("ssl.truststore.type", "");
sslMap.put("ssl.keystore.type", "");
sslMap.put("ssl.endpoint.identification.algorithm", "");
var kafka = KafkaUtil.getInstance("hosts", "messageTopic", sslMap, 1);
/**
* 发送数据
* @param sendData 需要发送的数据 (数据类型 List<Map<String,Object>>)
*/
kafka.sendList(sendData);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# Kafka消费者
简述
接收来自kafka消息, 分为4种方式:无认证方式,sasl-plain认证方式, sasl-scram认证方式,ssl认证方式。
var KafkaReceiver = Java.type("com.uinnova.dip.javascript.kafka.KafkaReceiver");
// kafka对String的反序列化方式
var keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
var valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
/**
* @param hosts Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
* @param messageTopic 消息主题
* @param groupId 分组ID
* @param keyDeserializer kafka对String的反序列化方式
* @param valueDeserializer kafka对String的反序列化方式
* @param timeOut 连接Kafka服务超时时间 (单位: 毫秒(ms), 默认200毫秒)
*/
var kafkaReceiver = new KafkaReceiver("hosts", "messageTopic", "groupId", keyDeserializer, valueDeserializer, 200);
/**
* @param hosts Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
* @param messageTopic 消息主题
* @param saslConf sasl-plain认证文件地址
* @param groupId 分组ID
* @param keyDeserializer kafka对String的反序列化方式
* @param valueDeserializer kafka对String的反序列化方式
* @param timeOut 连接Kafka服务超时时间 (单位: 毫秒(ms), 默认200毫秒)
*/
var kafkaReceiver = new KafkaReceiver("hosts", "messageTopic","saslConf", "groupId", keyDeserializer, valueDeserializer, 200);
/**
* @param hosts Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
* @param messageTopic 消息主题
* @param username sasl-scram 认证用户名
* @param password sasl-scram 认证密码
* @param scramType 密码加密类型,选择 SCRAM-SHA-256 或 SCRAM-SHA-512
* @param groupId 分组ID
* @param keyDeserializer kafka对String的反序列化方式
* @param valueDeserializer kafka对String的反序列化方式
* @param timeOut 连接Kafka服务超时时间 (单位: 毫秒(ms), 默认200毫秒)
*/
var kafkaReceiver = new KafkaReceiver("hosts", "messageTopic","username","password", "scramType","groupId", keyDeserializer, valueDeserializer, 200);
/**
* @param hosts Kafka服务地址 (0.0.0.0:9092, 多个地址之间用,隔开)
* @param messageTopic 消息主题
* @param sslMap ssl认证配置项
* @param groupId 分组ID
* @param keyDeserializer kafka对String的反序列化方式
* @param valueDeserializer kafka对String的反序列化方式
* @param timeOut 连接Kafka服务超时时间 (单位: 毫秒(ms), 默认200毫秒)
*/
var sslMap = new Java.util.HashMap();
sslMap.put("ssl.protocol", "");
sslMap.put("security.protocol", "");
sslMap.put("ssl.keystore.location", "");
sslMap.put("ssl.keystore.password","");
sslMap.put("ssl.key.password", "");
sslMap.put("ssl.truststore.location", "");
sslMap.put("ssl.truststore.password", "");
sslMap.put("ssl.truststore.type", "");
sslMap.put("ssl.keystore.type", "");
sslMap.put("ssl.endpoint.identification.algorithm", "");
var kafkaReceiver = new KafkaReceiver("hosts", "messageTopic", sslMap, "groupId", keyDeserializer, valueDeserializer, 200);
startReceiver(kafkaReceiver);
while (true) {
try {
var data = kafkaReceiver.getData();
logger.info("接收到的Kafka数据为:" + data);
} catch (error) {
logger.error("脚本执行错误:" + error.message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# MQTT生产者
简述
向指定MQTT Topic发送消息。
var MQTTSender = Java.type("com.uinnova.di.common.producer.mqtt.MQTTProducer");
/**
* 初始化Mqtt连接
* @param url Mqtt地址 (tcp://0.0.0.0:1883)
* @param clientID 客户端ID
* @param username 用户名 (若无用户名填写"")
* @param password 密码 (若无密码填写"")
*/
var mqttSender = MQTTSender.getInstance("url", "clientID", "username", "password");
try {
/**
* 消息发送
* @param messageTopic 消息主题
* @param message 需要发送的消息 (String类型)
* @param QoS 服务质量 (可填 0:最多分发一次, 1:至少分发一次, 2:仅分发一次, 默认填写2)
* @param charset 字符编码集 (默认填写utf-8)
*/
var messageSend = mqttSender.publishMessage("messageTopic", "message", 2, "utf-8");
if (messageSend) {
heartBeat.addOutCount(1);
logger.info("消息发送成功!");
} else {
logger.error("消息发送失败!");
}
} catch (error) {
logger.error("脚本执行错误:" + error.message);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# MQTT消费者
简述
接收MQTT消息。
var MQTTReceiver = Java.type("com.uinnova.dip.javascript.mqtt.DixMqttClient");
/**
* 初始化Mqtt消费者
* @param url Mqtt地址 (tcp://0.0.0.0:1883)
* @param clientID 客户端ID
* @param username 用户名 (若无用户名填写"")
* @param password 密码 (若无密码填写"")
* @param messageTopic 消息主题
* @param QoS 服务质量 (可填 0:最多分发一次, 1:至少分发一次, 2:仅分发一次, 默认填写2)
* @param charset 字符编码集 (默认填写utf-8)
*/
var receiver = MQTTReceiver.getInstance("url", "clientID", "username", "password", "messageTopic", 2, "utf-8");
startReceiver(receiver);
while (true) {
try {
var data = receiver.getData();
if (data != null) {
heartBeat.addInCount(1);
var message = JSON.parse(data);
logger.info("接收到Mqtt的数据为:" + message.recData + ",消息主题:" + message.fromTopic + ",服务质量:" + message.qos);
}
} catch (error) {
logger.error("脚本执行错误:" + error.message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# RocketMQ生产者
简述
向RocketMQ发送数据,支持同步发送,异步发送,单向发送三种模式,不同的方式应对不同的业务场景。
- 同步发送:每发送一次数据需要等待MQ响应结果,才会发送第二次数据。
- 异步发送:发送和响应为异步。
- 单向发送:仅发送数据,无返回结果。
备注:开发时采用Rocket最新版本,版本号v4.7.1。
var RocketMQProducer = Java.type("com.uinnova.di.common.producer.rocketmq.RocketMQProducer");
/**
* @param nameSrvAddr nameserver地址 (192.168.1.162:9876;192.168.1.161:9876) 多个分号隔开
* @param instanceName 生产者实例名称
* @param producerGroup 生产者组别
* @param topic 主题名称
* @param tags 消息标签
* @param sendTimeOut 超时时间 默认30000毫秒
* @param charset 编码集 默认UTF-8
* */
var producer = RocketMQProducer.getInstance("0.0.0.0:0000", "instanceName", "producerGroup", "topicA", "tagA", "30000", "UTF-8");
// 同步发送
producer.send("hello dix");
// 异步发送
producer.sendCallBack("hello dix");
// 单向发送
producer.sendOneWay("hello dix");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# RocketMQ消费者
简述
接收RocketMQ数据(PUSH模式
),消费模式支持广播和集群模式,消费方式支持并发和有序消费。
备注:开发时采用Rocket最新版本,版本号v4.7.1。
var RocketMqReceiver = Java.type("com.uinnova.dip.javascript.rocketmq.RocketMqReceiver");
/**
* @param url rocketmq服务地址 (0.0.0.0:9876;0.0.0.0:9876) 多个地址之间用分号隔开
* @param instanceName 消费者实例名称
* @param consumerGroup 消费者组别
* @param topic 主题名称
* @param tag 消息标签 tagA || tagB 多个标签用 || 隔开
* @param consumeFromWhere 中断后重新消费位置,参数可填 first, last, timestamp 默认first
* @param messageModel 消费模式(指同一组内消费者),参数可填 cluster, broadcas 默认集群消费
* @param consumeMethod 消费方式, 参数可填 concurrently, orderly 默认并发消费
* @param charset 编码集 默认UTF-8
*/
var receiver = RocketMqReceiver.getInstance("url", "instanceName", "consumerGroup", "topicA", "tagA || tagB", "first", "cluster", "concurrently", "UTF-8");
startReceiver(receiver);
while (true) {
try {
var data = receiver.getData();
if (data!=null) {
logger.info("接收到的MQ的数据:" + data);
}
} catch (error) {
logger.error("脚本执行错误:" + error.message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# RomaMQ生产者
简述
RomaMQ是华为基于RocketMQ v4.2.0封装的消息传输组件。
备注:由于同常规RocketMQ版本冲突,因此DIX正式发行版本中,无此方法,若有项目需求请联系开发者。
var RocketMQProducer = Java.type("com.uinnova.di.common.producer.romamq.RomaMQProducer");
/**
* 初始化Rocketmq连接
* @param url rocketmq服务地址 (0.0.0.0:9876;0.0.0.0:9876) 多个地址之间用分号隔开
* @param instanceName 生产者实例名称
* @param producerGroup 生产者组别
* @param topicName 主题名称
* @param tag 消息标签
* @param sendTimeOut 超时时间 默认30000毫秒
* @param charset 编码集 默认UTF-8
* */
var producer = RocketMQProducer.getInstance("url", "instanceName", "producerGroup", "topicName", "tag", "30000", "UTF-8");
// 同步发送
producer.send("test synchronous send");
// 异步发送
producer.sendCallBack("test asynchronous send");
// 单向发送
producer.sendOneWay("test one way send");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# RomaMQ消费者
简述
RomaMQ是华为基于RocketMQ v4.2.0封装的消息传输组件。
备注:由于同常规RocketMQ版本冲突,因此DIX正式发行版本中,无此方法,若有项目需求请联系开发者。
function run() {
var RomaMqReceiver = Java.type("com.uinnova.di.common.receiver.RomaMqReceiver");
/**
* @param nameSrvAddr 统一消息平台的服务器地址 (192.168.1.162:9876;192.168.1.161:9876) 多个分号隔开
* @param appId 客户端账号
* @param appSecret 消费者组别
* @param topic 主题名称
* @param tags 消息标签, 填写null时, 为匹配所有
* @param encryptTransport 是否需要加密传输 默认为null, 若需开启 填写"true"
* @param charset 编码集 默认 UTF-8
*/
var receiver = RomaMqReceiver.getInstance("0.0.0.0:0000", "appId", "appSecret", "topic", null, null, "UTF-8");
startReceiver(receiver);
while (true) {
try {
var data = receiver.getData();
if (data!=null) {
logger.info("---" + data);
}
} catch (error) {
logger.error("脚本执行错误:" + error.message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
← TCP-IP协议相关 数据库相关 →