Vertx-EventBus篇
简介
EventBus,又称消息总线,类似我们常见的消息中间件。支持点对点、请求与响应、发布订阅模式,支持跨服务跨语言通讯,分布式消息系统。
常见用法
点对点send
消息发送到某个地址上,Vertx把消息分发到注册到这个地址上的某个消费者上,若存在多个消费者,使用轮询算法选择一个消费者接收消息。
Producer
public class Producer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.setPeriodic(5000L, h -> {
System.out.println("***********************");
vertx.eventBus().send("test", "你吃了吗");
});
super.start(startPromise);
}
}
Consumer
public class Consumer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
Random random = new Random();
int num = random.nextInt(10000);
vertx.eventBus().<String>consumer("test", msg -> {
System.out.println(String.format("消费者%s接收到消息:%s", num, msg.body()));
});
super.start(startPromise);
}
}
App
public class App {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(Consumer.class.getName(), new DeploymentOptions().setInstances(3))
.compose(res -> vertx.deployVerticle(Producer.class.getName()));
}
}
AppRunLog
***********************
消费者7414接收到消息:你吃了吗
***********************
消费者6421接收到消息:你吃了吗
***********************
消费者5802接收到消息:你吃了吗
请求响应request
请求与响应也是点对点模式的一种,区别在于消费者可以回复结果,两者可以进行会话交流。
Producer
public class Producer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.setPeriodic(5000L, h -> {
System.out.println("***********************");
vertx.eventBus().<String>request("test", "你吃了吗")
.onComplete(res -> {
if (res.succeeded()) {
System.out.println("生产者收到回应:"+ res.result().body());
}
});
});
super.start(startPromise);
}
}
Consumer
public class Consumer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
Random random = new Random();
int num = random.nextInt(10000);
vertx.eventBus().<String>consumer("test", msg -> {
System.out.println(String.format("消费者%s接收到消息:%s", num, msg.body()));
msg.reply(num+"吃过啦");
});
super.start(startPromise);
}
}
App
public class App {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(Consumer.class, new DeploymentOptions().setInstances(3))
.compose(res -> vertx.deployVerticle(Producer.class.getName()));
}
}
AppRunLog
***********************
消费者8585接收到消息:你吃了吗
生产者收到回应:8585吃过啦
***********************
消费者9672接收到消息:你吃了吗
生产者收到回应:9672吃过啦
***********************
消费者3615接收到消息:你吃了吗
生产者收到回应:3615吃过啦
发布订阅publish
消息发送到某个地址上,Vertx把消息分发到注册到这个地址上的所有消费者上。
Producer
public class Producer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.setPeriodic(5000L, h -> {
System.out.println("***********************");
vertx.eventBus().publish("test", "你吃了吗");
});
super.start(startPromise);
}
}
Consumer
public class Consumer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
Random random = new Random();
int num = random.nextInt(10000);
vertx.eventBus().<String>consumer("test", msg -> {
System.out.println(String.format("消费者%s接收到消息:%s", num, msg.body()));
});
super.start(startPromise);
}
}
App
public class App {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(Consumer.class.getName(), new DeploymentOptions().setInstances(3))
.compose(res -> vertx.deployVerticle(Producer.class.getName()));
}
}
AppRunLog
***********************
消费者5195接收到消息:你吃了吗
消费者9192接收到消息:你吃了吗
消费者2572接收到消息:你吃了吗
***********************
消费者2572接收到消息:你吃了吗
消费者5195接收到消息:你吃了吗
消费者9192接收到消息:你吃了吗
编码解码器
Vertx消息总线默认只对一些基础类型的消息提供编码解码,若想要发送一个实体对象消息,那么需要自定义消息编码解码器,不然会报错 No message codec for type.
自定义单个编解码器
User
@Data
public class User {
private String id;
private String name;
}
UserCodec
public class UserCodec implements MessageCodec<User, User> {
/**
* 编码
* @param buffer
* @param user
*/
@Override
public void encodeToWire(Buffer buffer, User user) {
Buffer encoded = Json.CODEC.toBuffer(user, false);
buffer.appendInt(encoded.length());
buffer.appendBuffer(encoded);
}
/**
* 集群传输解码
* @param pos
* @param buffer
* @return
*/
@Override
public User decodeFromWire(int pos, Buffer buffer) {
int length = buffer.getInt(pos);
pos += 4;
Buffer slice = buffer.slice(pos, pos + length);
String json = slice.toString();
return JacksonUtil.jsonToBean(json, User.class);
}
/**
* 本地传输解码
* @param user
* @return
*/
@Override
public User transform(User user) {
return user;
}
@Override
public String name() {
return "user";
}
@Override
public byte systemCodecID() {
return -1;
}
}
Producer
public class Producer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.setTimer(2000L, h -> {
System.out.println("***********************");
User user = new User();
user.setId("1");
user.setName("张三");
vertx.eventBus().<User>request("test", user)
.onComplete(res -> {
if (res.succeeded()) {
System.out.println("生产者收到回应:"+ res.result().body());
}
});
});
super.start(startPromise);
}
}
Consumer
public class Consumer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.eventBus().<User>consumer("test", msg -> {
System.out.println(String.format("消费者接收到消息:%s",msg.body()));
User user = new User();
user.setId("2");
user.setName("李四");
msg.reply(user);
});
super.start(startPromise);
}
}
App
public class App {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.eventBus().registerDefaultCodec(User.class, new UserCodec());
vertx.deployVerticle(Consumer.class.getName())
.compose(res -> vertx.deployVerticle(Producer.class.getName()));
}
}
AppRunLog
***********************
消费者接收到消息:User(id=1, name=张三)
生产者收到回应:User(id=2, name=李四)
自定义公用编解码器
RestRequest
公用请求参数类
@Data
public class RestRequest<T> {
private String reqTopic;
private T reqBody;
}
ReqMessageCodec
公用请求参数消息编解码类
public class ReqMessageCodec<T> implements MessageCodec<RestRequest<T>, T> {
@Override
public void encodeToWire(Buffer buffer, RestRequest<T> request) {
Buffer encoded = Json.CODEC.toBuffer(request, false);
buffer.appendInt(encoded.length());
buffer.appendBuffer(encoded);
}
@Override
public T decodeFromWire(int pos, Buffer buffer) {
int length = buffer.getInt(pos);
pos += 4;
Buffer slice = buffer.slice(pos, pos + length);
String s = slice.toString();
RestRequest<T> request = JacksonUtil.jsonToBean(s, new TypeReference<>() {
});
return request.getReqBody();
}
@Override
public T transform(RestRequest<T> request) {
return request.getReqBody();
}
@Override
public String name() {
return "req";
}
@Override
public byte systemCodecID() {
return -1;
}
}
RestResponse
公用响应消息类
@Data
public class RestResponse<T> {
private int code;
private T data;
private String msg;
}
RespMessageCodec
公用响应消息编解码类
public class RespMessageCodec<T> implements MessageCodec<RestResponse<T>, RestResponse<T>> {
@Override
public void encodeToWire(Buffer buffer, RestResponse<T> response) {
Buffer encoded = Json.CODEC.toBuffer(response, false);
buffer.appendInt(encoded.length());
buffer.appendBuffer(encoded);
}
@Override
public RestResponse<T> decodeFromWire(int pos, Buffer buffer) {
int length = buffer.getInt(pos);
pos += 4;
Buffer slice = buffer.slice(pos, pos + length);
return JacksonUtil.jsonToBean(slice.toString(), new TypeReference<>() {
});
}
@Override
public RestResponse<T> transform(RestResponse<T> response) {
return response;
}
@Override
public String name() {
return "resp";
}
@Override
public byte systemCodecID() {
return -1;
}
}
Producer
public class Producer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.setTimer(2000L, h -> {
System.out.println("***********************");
User user = new User();
user.setId("1");
user.setName("张三");
RestRequest<User> req = new RestRequest<>();
req.setReqTopic("addUser");
req.setReqBody(user);
vertx.eventBus().<RestResponse>request(req.getReqTopic(), req)
.onComplete(res -> {
if (res.succeeded()) {
System.out.println("生产者收到回应:"+ res.result().body());
}else {
System.out.println("生产者发送消息失败:"+res.cause().getMessage());
}
});
});
super.start(startPromise);
}
}
Consumer
public class Consumer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.eventBus().<User>consumer("addUser", msg -> {
System.out.println(String.format("消费者接收到消息:%s",msg.body()));
RestResponse<String> response = new RestResponse();
response.setCode(200);
response.setMsg("插入成功");
msg.reply(response);
});
super.start(startPromise);
}
}
SingleApp
本地传输数据
public class SingleApp {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.eventBus().registerDefaultCodec(RestRequest.class, new ReqMessageCodec())
.registerDefaultCodec(RestResponse.class, new RespMessageCodec());
vertx.deployVerticle(Consumer.class.getName())
.compose(res -> vertx.deployVerticle(Producer.class.getName()));
}
}
SingleAppRunLog
***********************
消费者接收到消息:User(id=1, name=张三)
生产者收到回应:RestResponse(code=200, data=null, msg=插入成功)
ClusterApp
集群模式传输数据
public class ClusterApp {
static Vertx vertx;
public static void initCluster(Verticle service) {
ClusterManager mgr = new HazelcastClusterManager();
VertxOptions options = new VertxOptions().setClusterManager(mgr);
Vertx.clusteredVertx(options)
.compose(v -> {
vertx = v;
vertx.eventBus().registerDefaultCodec(RestRequest.class, new ReqMessageCodec())
.registerDefaultCodec(RestResponse.class, new RespMessageCodec());
return vertx.deployVerticle(service);
}).onSuccess(h -> {
System.out.println("App Start Complete!");
})
.onFailure(err -> {
err.printStackTrace();
System.err.println("App Start Failed "+ err.getMessage());
});
}
}
ConsumerApp
消费服务
public class ConsumerApp extends ClusterApp{
public static void main(String[] args) {
initCluster(new Consumer());
}
}
ConsumerAppRunLog
Members {size:2, ver:2} [
Member [127.0.0.1]:5703 - 29afc653-a4ef-4933-ab6a-ed0bbbb7041e this
Member [127.0.0.1]:5704 - 289a8535-cb6f-4fef-8501-04c40c56f189
]
消费者接收到消息:{id=1, name=张三}
ProducerApp
生产服务
public class ProducerApp extends ClusterApp{
public static void main(String[] args) {
initCluster(new Producer());
}
}
ProducerAppRunLog
Members {size:2, ver:2} [
Member [127.0.0.1]:5703 - 29afc653-a4ef-4933-ab6a-ed0bbbb7041e
Member [127.0.0.1]:5704 - 289a8535-cb6f-4fef-8501-04c40c56f189 this
]
***********************
生产者收到回应:RestResponse(code=200, data=null, msg=插入成功)
DeliveryOptions
设置请求响应超时时间
public class Producer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
System.out.println(new Date()+"***********************");
vertx.eventBus().<String>request("test", "你吃了吗",
new DeliveryOptions().setSendTimeout(5000))
.onComplete(res -> {
if (res.succeeded()) {
System.out.println(new Date()+"生产者接收应答消息:"+res.result().body());
} else {
System.out.println(new Date()+"生产者发送消息失败"+res.cause().getMessage());
}
});
super.start(startPromise);
}
}