当前位置: 首页 > article >正文

[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录

          • Introduce
          • Project Structure
          • Declare Plugins and Modules
          • Apply Plugins and Add Dependencies
          • Sender Properties
          • Sender Application
          • Sender Controller
          • Receiver Properties
          • Receiver Application
          • Receiver Message Handler
          • Congratulations
          • Automatically Send Message By Interval
          • Type Adapter for Payload
          • Send Message Model as JSON
          • Receive JSON as Message Model

Introduce

spring-cloud-starter-stream have a great change since version 4.x

most annotations like @EnableBinding @Input @Output @StreamListener were all removed

this blog is about stream-rocketmq, but also fit for stream-kafaka

just migrate dependency from rocketmq to kafaka

Project Structure
  • stream-binder-sender : rocket message sender
  • stream-binder-receiver : rocket message receiver
Declare Plugins and Modules
pluginManagement {
    repositories {
        gradlePluginPortal()
        google()
        mavenCentral()
    }
}

dependencyResolutionManagement {
    repositoriesMode = RepositoriesMode.PREFER_SETTINGS
    repositories {
        gradlePluginPortal()
        google()
        mavenCentral()
    }
}

buildscript {
    repositories {
        gradlePluginPortal()
        google()
        mavenCentral()
    }
}

plugins {
    id("org.jetbrains.kotlin.jvm") version "2.0.21" apply false
    id("org.jetbrains.kotlin.kapt") version "2.0.21" apply false
    id("org.jetbrains.kotlin.plugin.spring") version "2.0.21" apply false
    id("org.springframework.boot") version "3.4.1" apply false
}

include("stream-binder-sender")
include("stream-binder-receiver")
Apply Plugins and Add Dependencies
plugins {
    id("org.jetbrains.kotlin.jvm")
    id("org.jetbrains.kotlin.kapt")
    id("org.jetbrains.kotlin.plugin.spring")
    id("org.springframework.boot")
}

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

dependencies {
    val springBootVersion = "3.4.1"
    val springCloudVersion = "4.2.0"
    val springCloudAlibabaVersion = "2023.0.3.2"
    // commons
    api("io.github.hellogoogle2000:kotlin-commons:1.0.19")
    // kotlin
    api("org.jetbrains.kotlin:kotlin-reflect:2.0.21")
    // spring
    api("org.springframework.boot:spring-boot-starter:$springBootVersion")
    api("org.springframework.boot:spring-boot-starter-web:$springBootVersion")
    api("org.springframework.cloud:spring-cloud-starter-bootstrap:$springCloudVersion")
    // spring cloud stream binder
    api("com.alibaba.cloud:spring-cloud-starter-stream-rocketmq:$springCloudAlibabaVersion")
}
Sender Properties

configTopicSender-out is the name for customized output binding object

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Sender Application
package x.spring.hello

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class StreamBinderSenderApplication

fun main(args: Array<String>) {
    runApplication<StreamBinderSenderApplication>(*args)
}
Sender Controller

the binding name for sending should be same with output name in properties

package x.spring.hello.controller

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.support.MessageBuilder
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController

@RestController
class MessageSendController {

    @Autowired
    private lateinit var bridge: StreamBridge

    @GetMapping("send")
    fun send(): String {
        val payload = "config"
        val message = MessageBuilder.withPayload(payload).build()
        bridge.send("configTopicProducer-out", message)
        return "send successfully"
    }
}
Receiver Properties

plainTextConsumer is the name of message handler function

remember it and you should implement it by yourself

you can define multiple message handler functions, and split with ,

plainTextConsumer-in-0 is the name of input binding object

its format is constrained to format of <definition>-in-<index>

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Receiver Application
package x.spring.hello

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class StreamBinderReceiverApplication

fun main(args: Array<String>) {
    runApplication<StreamBinderReceiverApplication>(*args)
}
Receiver Message Handler

function name correspond to properties specified by spring.cloud.function.definition property

package x.spring.hello.component

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class MessageConsumerObject {

    @Bean("configTopicConsumer")
    fun configTopicConsumer(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message ->
            val payload = message.payload
            println("consumer receive config topic message: $payload")
        }
    }
}
Congratulations

now, you have get known about basic usage of message binder

do not modify demos above, it may cause a failure, and waste lots of time

try your own ways, let them run out first

let us try some advanced way, after achieve goals above

Automatically Send Message By Interval

register a supplier object to automatically generate heartbeat message

package x.spring.hello.component

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.util.MimeTypeUtils
import java.util.function.Supplier

@Component
class MessageSupplierObject {

    @Bean
    fun heartPacketProducer(): Supplier<Message<String>> {
        return Supplier<Message<String>> {
            println("send heart packet message")
            val payload = "heart"
            val message = MessageBuilder
                .withPayload(payload)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
                .build()
            return@Supplier message
        }
    }
}

update properties of sender project, add a output binding object named heartPacketProducer

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.function.definition=heartPacketProducer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.bindings.configTopicProducer-out.consumer.concurrency=100
spring.cloud.stream.bindings.heartPacketProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketProducer-out-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketProducer-out-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketProducer-out-0.consumer.concurrency=100
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

update properties of receiver project, add a input binding object named heartPacketConsumer

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer;heartPacketConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketConsumer-in-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

register message handler function for receiver project

package x.spring.hello.component

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class MessageConsumerObject {

    @Bean("heartPacketConsumer")
    fun heartPacketConsumer(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message ->
            val payload = message.payload
            println("consumer receive heart packet message: $payload")
        }
    }

    @Bean("configTopicConsumer")
    fun configTopicConsumer(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message ->
            val payload = message.payload
            println("consumer receive config topic message: $payload")
        }
    }
}
Type Adapter for Payload

this enable your auto send and receive advanced object like class/json/xml

put this adapter file into both sender project and receiver object

package x.spring.hello.component

import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import x.kotlin.commons.serialize.JSON.fromJson
import x.kotlin.commons.serialize.JSON.toJson
import x.spring.hello.model.ConfigModel
import java.util.function.Function

@Component
class MessageModelAdapter {

    @Bean
    fun configModelConvertor1(): Function<ConfigModel, String> {
        return Function { it.toJson() }
    }

    @Bean
    fun configModelConvertor2(): Function<String, ConfigModel> {
        return Function { it.fromJson(ConfigModel::class.java) }
    }
}
Send Message Model as JSON
@GetMapping("send2")
fun send2(): String {
    val payload = ConfigModel()
    payload.username = "admin"
    payload.password = "123456"
    val message = MessageBuilder
        .withPayload(payload)
        .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
        .build()
    bridge.send("configModelProducer-out", message)
    return "send successfully"
}
spring.cloud.stream.bindings.configModelProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configModelProducer-out.destination=topic-config-model
spring.cloud.stream.bindings.configModelProducer-out.content-type=application/json
spring.cloud.stream.bindings.configModelProducer-out.consumer.concurrency=100
Receive JSON as Message Model
@Bean
fun configModelConsumer(): Consumer<Message<ConfigModel>> {
    return Consumer<Message<ConfigModel>> { message ->
        val payload = message.payload.toJson()
        println("consumer receive config model message: $payload")
    }
}
spring.cloud.function.definition=configModelConsumer
spring.cloud.stream.bindings.configModelConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configModelConsumer-in-0.destination=topic-config-model
spring.cloud.stream.bindings.configModelConsumer-in-0.content-type=application/json

http://www.kler.cn/a/519432.html

相关文章:

  • Go语言开发项目文件规范
  • aws(学习笔记第二十六课) 使用AWS Elastic Beanstalk
  • Day42:列表的组合
  • iOS 集成ffmpeg
  • VScode 开发 Springboot 程序
  • HTB:Support[WriteUP]
  • 我谈区域偏心率
  • 【Android】乱七八糟的小结
  • 健身房项目 Uniapp+若依Vue3版搭建!!
  • gtest with ros
  • Qwen2-VL:在任何分辨率下增强视觉语言模型对世界的感知 (大型视觉模型 核心技术 分享)
  • AutoMapper的使用
  • stm8s单片机(三)时钟系统与时钟切换
  • 【工具】CountUp.js
  • 国产编辑器EverEdit - 目录树
  • SSH代理實用指南
  • 【Matlab高端绘图SCI绘图模板】第003期 绘制面积填充图
  • 【AI非常道】二零二五年一月,AI非常道
  • 数据结构——概念与时间空间复杂度
  • 【Java】面试中遇到的两个排序
  • 策略模式 - 策略模式的使用
  • 【Leetcode刷题记录】18.四数之和
  • HTML5和CSS3拔高
  • mysql数据库启动出现Plugin ‘FEEDBACK‘ is disabled.问题解决记录
  • 08.OSPF 特殊区域及其他特性
  • 嵌入式音视频开发——视频篇(一)