java中 kafka简单应用
1 org.springframework.kafka.core 包
2 调用地方
@Autowired @Qualifier("kafkaMessageServiceImpl") private MessageService messageService;
MessageSendLog messageSendLog = getMessageSendLog(subject, content, taskSubQuery.getTaskExeId(), info.getUserId(), taskExeName, taskConfig.getCreatorId()); messageService.commonPlatform(messageSendLog);
3 原始类
@Slf4j @Service("kafkaMessageServiceImpl") @ConditionalOnExpression("!'${environment.mode}'.equals('pre')") public class KafkaMessageServiceImpl implements MessageService { @Autowired @Qualifier("kafkaTemplateSj") private KafkaTemplate<String, Object> kafkaTemplateSj;
@Override public void platform(NotificationParam notificationParam) { List<AddMessageDTO> dtoList = conformityMessageDto(notificationParam); for (AddMessageDTO dto:dtoList) { notificationParam.setReceverid(dto.getReceiver()); ItemNotificationRecord itemNotificationRecord = TaskItemTranslator.saveTaskItemNotificationRel(notificationParam); try { kafkaTemplateSj.send(MqConstant.ICWP_MSG_RECEIVE, /*dto.getMessgaeId(),*/ JSON.toJSONString(dto)) .addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { if (notificationParam.getFailCallback() != null) { notificationParam.getFailCallback().accept(ex.getMessage()); } itemNotificationRecord.setStatus("fail"); itemNotificationRecordMapper.insert(itemNotificationRecord); log.error("platform sendKafkaMsg log : send topic error, msg = {}", ex.getMessage(), ex); } @Override public void onSuccess(SendResult<String, Object> result) { if (notificationParam.getSuccCallback() != null) { notificationParam.getSuccCallback().accept("success"); } itemNotificationRecordMapper.insert(itemNotificationRecord); log.info("platform sendKafkaMsg log : send topic success !"); } }); } catch (Exception e) { itemNotificationRecord.setStatus("fail"); itemNotificationRecordMapper.insert(itemNotificationRecord); log.error("record cron sendKafkaMsgForChangeEventStatus error!", e); } } }
@Override public void commonPlatform(MessageSendLog messageSendLog) { List<AddMessageDTO> dtoList = getByAddMessageNewDtOlist(messageSendLog); for (AddMessageDTO dto:dtoList) { MessageSendLog messageInfo = messageSendLogVo(messageSendLog,dto); try { kafkaTemplateSj.send(MqConstant.ICWP_MSG_RECEIVE, JSON.toJSONString(dto)) .addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { if (messageSendLog.getFailCallback() != null) { messageSendLog.getFailCallback().accept(ex.getMessage()); } messageInfo.setStatus("fail"); messageSendLogMapper.insert(messageInfo); log.error("platform commonPlatKafkaMsg log : send topic error, msg = {}", ex.getMessage(), ex); } @Override public void onSuccess(SendResult<String, Object> result) { if (messageSendLog.getSuccCallback() != null) { messageSendLog.getSuccCallback().accept("success"); } messageInfo.setStatus("success"); messageSendLogMapper.insert(messageInfo); log.info("platform commonPlatKafkaMsg log : send topic success !"); } }); } catch (Exception e) { messageInfo.setStatus("fail"); messageSendLogMapper.insert(messageInfo); log.error("record cron commonPlatKafkaMsg error!", e); } } }
public final static String ICWP_MSG_RECEIVE = "icwp-message-receive";
package com.sf.gis.common.entity; import com.baomidou.mybatisplus.annotation.*; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Getter; import lombok.Setter; import java.time.LocalDateTime; import java.util.function.Consumer; /** * <p> * * </p> * * @author xxx * @since 2024-04-22 */ @Getter @Setter @TableName("message_send_log") @ApiModel(value = "MessageSendLog对象", description = "") public class MessageSendLog { @ApiModelProperty("UUID") @TableId(value = "message_send_log_id", type = IdType.UUID) private String messageSendLogId; @ApiModelProperty("消息主题") @TableField("subject") private String subject; @ApiModelProperty("消息来源来源的业务系统(1:数字网格+、2:政法平安、3、事件分拨、 4、决策分析、5、三级工作平台)") @TableField("source") private Integer source; @ApiModelProperty("消息内容") @TableField("content") private String content; @ApiModelProperty("消息类型(0 普通、1 预警、2 待办、 3 公告) 5 领导批示 6 领导关注") @TableField("type") private Integer type; @ApiModelProperty("链接地址支持跳转到业务系统的详 情地址") @TableField("link") private String link; @ApiModelProperty("移动端链接地址") @TableField("mobile_link") private String mobileLink; @ApiModelProperty("消息等级(0 一般、1 紧急)") @TableField("priority") private Integer priority; @ApiModelProperty("发起人 ID") @TableField("creator") private String creator; @ApiModelProperty("接收者(根据 receiverType 来传参) 到人:用户 ID 到部门:部门 Code 到角色:角色 Code ") @TableField("receiver") private String receiver; @ApiModelProperty("接收者类型 0 到人、1 到部门、2 到 角色(默认 0)") @TableField("receiver_type") private Integer receiverType; @ApiModelProperty("业务类型code") @TableField("bus_code") private String busCode; @ApiModelProperty("环节状态名称") @TableField("link_state_name") private String linkStateName; @ApiModelProperty("消息发送状态") @TableField("status") private String status; @ApiModelProperty("流水号") @TableField("rel_id") private String relId; @TableField(value = "create_time", fill = FieldFill.INSERT) private LocalDateTime createTime; /** * 失败回调 */ @TableField(exist = false) private Consumer<String> failCallback; /** * 成功回调 */ @TableField(exist = false) private Consumer<String> succCallback; }