当前位置: 首页 > article >正文

SpringCloud + Spring AI Alibaba 整合阿里云百炼大模型

一、前言

记录一次自己使用微服务整合阿里云的百炼大模型,需要用到Redis来记录最近五条信息,已能够保证上下文的连通性,Ai和用户之间的对话是使用的MongoDB来进行存储。然后我这篇文章是介绍了两种请求方式,一种是通过Http请求,一种是通过WebSocket+Netty的方式,如果你还没有Redis可以先去安装对应环境或者可以将Redis改为通过使用List来对最近的消息进行存储。话不多说,开始。

二、引入依赖

(1)相关Maven依赖

 <!--            alibaba-ai-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-ai</artifactId>
    <version>2023.0.1.2</version>
</dependency>

 <!--            redis-->
 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>

 <!--            netty-->
 <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
 </dependency>

 <!--            mongodb-->

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
  </dependency>

(2)yaml配置

spring:
  redis:
    host: 192.168.254.100
    port: 6379
    password: 123456
  cloud:
      ai:
        tongyi:
          connection:
            #这里的api-key需要你到阿里云大模型控制台获取,具体获取可以百度
            api-key: 您的api-key
  data:
    mongodb:
      host: 192.168.254.100
      port: 27017
      database: chat
      password: 'mongo' #注意这里的密码一定要加单引号包裹起来,不然会连接不上
      username: mongo
      authentication-database: admin 

 三、Http请求方式

(1)Controller层

@RestController
@RequestMapping("/ai")
@CrossOrigin
public class AiController {

	@Autowired
	private AiService aiService;

	/**
	 * 调用百炼大模型
	 * @param message 询问消息
	 * @return 结果
	 */
	@GetMapping("/tongyi")
	public R<String> completion(@RequestParam(value = "message", defaultValue = "") String message) {
		return aiService.handleAI(message,0L);
	}


	/**
	 * 获取与ai的聊天记录
	 * @param timestamp 时间戳(作为搜索游标)
	 * @param size 每页显示行数
	 * @return 结果
	 */
	@GetMapping("/getAiChatLogs")
	public TableDataInfo getAiChatLogs(@RequestParam(defaultValue = "0") Long timestamp,
	                                   @RequestParam(defaultValue = "20") int size){
		return aiService.getAiChatLogs(timestamp,size);
	}
}

(2)Service层

public interface AiService {

	/**
	 * 调用百炼大模型
	 * @param message 询问消息
	 * @return 结果
	 */
	R<String> handleAI(String message, Long userId);

	/**
	 * 获取与ai的聊天记录
	 * @param timestamp 时间戳(作为搜索游标)
	 * @param size 每页显示行数
	 * @return 结果
	 */
	TableDataInfo getAiChatLogs(Long timestamp, int size);
}

(3)ServiceImpl层

@Service
@Slf4j
public class AiServiceImpl implements AiService {

	private static final String REDIS_KEY_PREFIX = "ai:chat:history:"; // Redis键前缀
	private static final int MAX_HISTORY_ROUNDS = 5; // 保留最近5轮对话

	@Autowired
	private RedisService redisService;

	@Autowired
	private MongoTemplate mongoTemplate;

	/**
	 * 调用百炼大模型
	 * @param message 询问消息
	 * @return 结果
	 */
	@Override
	public R<String> handleAI(String message, Long userId) {
		Generation gen = new Generation();

		// 从 Redis 中获取历史消息
		List<Message> messages = getHistoryFromRedis(userId);
		// 添加用户消息
		Message userMsg = Message.builder()
				.role(Role.USER.getValue())
				.content(message)
				.build();
		messages.add(userMsg);

		// 构建请求参数
		GenerationParam param = GenerationParam.builder()
				.model("qwen-turbo")  //指定使用的 AI 模型名称
				.messages(messages) //设置对话上下文 (说明:接收一个 List<Message> 对象,包含用户与 AI 的历史对话记录。模型会根据完整的消息列表理解上下文关系,生成连贯的回复)
				.resultFormat(GenerationParam.ResultFormat.MESSAGE) //指定响应格式(MESSAGE 表示返回结构化消息对象(包含角色、内容等元数据))
				.topK(50) //控制候选词范围(每个生成步骤仅考虑概率最高的前 50 个候选词。增大该值会提高多样性但可能降低准确性,减小则使输出更集中)
				.temperature(0.8f) //调节输出随机性(0.8 属于中等偏高随机性,适合需要创造性但保持一定连贯性的场景)
				.topP(0.8) //动态候选词选择(核采样)
				.seed(1234) //设置随机种子(固定种子(如 1234)可使生成结果可重复)
				.build();

		try {
			// 调用API并获取回复
			GenerationResult result = gen.call(param);
			Message aiMessage = result.getOutput().getChoices().get(0).getMessage();
			String content = aiMessage.getContent();
			// 将AI回复加入对话历史
			messages.add(aiMessage);
			// 保存对话历史到 Redis
			saveHistoryToRedis(userId, messages);
			return R.ok(content);
		} catch (NoApiKeyException | InputRequiredException e) {
			log.error("调用模型出错---->{}",e.getMessage());
			throw new RuntimeException(e);
		}
	}

	/**
	 * 获取与ai的聊天记录
	 * @param timestamp 时间戳(作为搜索游标)
	 * @param size 每页显示行数
	 * @return 结果
	 */
	@Override
	public TableDataInfo getAiChatLogs(Long timestamp, int size) {
		// 创建分页请求,按create_time降序
		Query query = new Query()
				.with(Sort.by(Sort.Direction.DESC, "timestamp"))
				.limit(size);
		//添加用户作为条件
		Long userId = SecurityUtils.getUserId();
		query.addCriteria(Criteria.where("userId").is(userId));
		if (timestamp != null && timestamp>0) {
			// 添加条件:timestamp < 上一页最后一条记录的 timestamp
			query.addCriteria(Criteria.where("timestamp").lt(timestamp));
		}
		List<AiChat> aiChats = mongoTemplate.find(query, AiChat.class);
		Collections.reverse(aiChats);
		TableDataInfo tableDataInfo = new TableDataInfo();
		tableDataInfo.setCode(200);
		tableDataInfo.setMsg("成功");
		tableDataInfo.setRows(aiChats);
		return tableDataInfo;
	}

	/**
	 * 从 Redis 获取历史对话记录
	 */
	private List<Message> getHistoryFromRedis(Long userId) {
		String redisKey = REDIS_KEY_PREFIX + userId;
		Object obj = redisService.get(redisKey);
		if (obj instanceof String) {
			return JSON.parseArray((String) obj, Message.class);
		}
		List<Message> objects = new ArrayList<>();
		// 添加系统消息(只在会话首次建立时添加)
		Message systemMsg = Message.builder()
				.role(Role.SYSTEM.getValue())
				.content("你的身份是一名AI教练,你只回答关于健身方面的问题,其他问题可以委婉表明自己只能回答健身有关的问题!")
				.build();
		objects.add(systemMsg);
		return objects;
	}

	/**
	 * 保存对话历史到 Redis
	 */
	private void saveHistoryToRedis(Long userId, List<Message> messages) {
		truncateHistory(messages);
		String redisKey = REDIS_KEY_PREFIX + userId;
		// 转换为JSON字符串存储
		String jsonString = JSON.toJSONString(messages);
		redisService.set(redisKey, jsonString, 30 * 60);
	}

	/**
	 * 截断历史记录,保留最近的对话轮次
	 */
	private void truncateHistory(List<Message> messages) {
		int maxSize = 1 + MAX_HISTORY_ROUNDS * 2;
		if (messages.size() > maxSize) {
			List<Message> truncated = new ArrayList<>();
			// 添加类型校验
			if (messages.get(0) != null) {
				truncated.add(messages.get(0));
			}
			int start = Math.max(1, messages.size() - MAX_HISTORY_ROUNDS * 2);
			truncated.addAll(messages.subList(start, messages.size()));
			messages.clear();
			messages.addAll(truncated);
		}
	}
}

四、WebSocket+Netty方式

(1)创建Session层用于保存连接与用户的关联

  •         创建AiSession
/**
 * 存储ai业务中用户与netty之间的关联关系
 */
public interface AiSession {

	void save(Long userId, Channel channel);

	Channel getChannelByUserId(Long userId);

	Long getUserIdByChannel(Channel channel);

	void removeSessionByUserId(Long userId);

	void removeSessionByChannel(Channel channel);

	void clearAllSession();

}
  •         AiSession对应的实现类
@Service
public class AiSessionImpl implements AiSession {

	//用户id与Channel连接(key:用户id,value:channel)
	private final Map<Long, Channel> userIdLinkChannel = new HashMap<>();

	//Channel与用户id连接(key:channel,value:用户id)
	private final Map<Channel, Long> channelLinkUserId = new HashMap<>();


	/**
	 * 保存userId和Channel关系
	 * @param userId 用户id
	 * @param channel channel
	 */
	@Override
	public void save(Long userId, Channel channel) {
		userIdLinkChannel.put(userId,channel);
		channelLinkUserId.put(channel,userId);
	}

	/**
	 * 根据用户id获取Channel
	 * @param userId 用户id
	 * @return 结果
	 */
	@Override
	public Channel getChannelByUserId(Long userId) {
		return userIdLinkChannel.get(userId);
	}

	/**
	 * 根据Channel获取用户id
	 * @param channel Channel
	 * @return 结果
	 */
	@Override
	public Long getUserIdByChannel(Channel channel) {
		return channelLinkUserId.get(channel);
	}

	/**
	 * 根据用户id删除userId和Channel相互关联
	 * @param userId 用户id
	 */
	@Override
	public void removeSessionByUserId(Long userId) {
		Channel channelByUserId = getChannelByUserId(userId);
		channelLinkUserId.remove(channelByUserId);
		userIdLinkChannel.remove(userId);
	}

	/**
	 * 根据用户Channel删除userId和Channel相互关联
	 * @param channel channel
	 */
	@Override
	public void removeSessionByChannel(Channel channel) {
		Long userIdByChannel = getUserIdByChannel(channel);
		userIdLinkChannel.remove(userIdByChannel);
		channelLinkUserId.remove(channel);
	}

	/**
	 * 清空所有关联关系
	 */
	@Override
	public void clearAllSession() {
		userIdLinkChannel.clear();
		channelLinkUserId.clear();
	}


}

(2)Netty配置

  • 创建WebSocketNettyServer
@Slf4j
@Component
public class WebSocketNettyServer {

	@Autowired
	private AiInitializer aiInitializer;

	private final ServerBootstrap aiServerBootstrap = new ServerBootstrap();

	private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);

	private final EventLoopGroup workerGroup = new NioEventLoopGroup();

	@PostConstruct
	public void WebSocketNettyServerInit() {
/*		// 初始化服务器启动对象
		// 主线程池
		NioEventLoopGroup mainGrp = new NioEventLoopGroup();
		// 从线程池
		NioEventLoopGroup subGrp = new NioEventLoopGroup();*/

		aiServerBootstrap
				// 指定使用上面创建的两个线程池
				.group(bossGroup, workerGroup)
				// 指定Netty通道类型
				.channel(NioServerSocketChannel.class)
				// 指定通道初始化器用来加载当Channel收到事件消息后
				.childHandler(aiInitializer);


	}

	public void start() throws InterruptedException {
		// 绑定服务器端口,以异步的方式启动服务器
		ChannelFuture futureRelays = aiServerBootstrap.bind("0.0.0.0",6000).sync();
		if (futureRelays.isSuccess()){
			log.info("ai-netty初始化完成,端口6000)");
		}
	}
}
  • 创建AiInitializer
@Component
public class AiInitializer extends ChannelInitializer<SocketChannel> {

	@Autowired
	private AiHandler aiHandler;
	@Override
	protected void initChannel(SocketChannel socketChannel) throws Exception {
		//获取对应的管道
		ChannelPipeline pipeline = socketChannel.pipeline();
		pipeline
		        .addLast(new HttpServerCodec())
				//添加对大数据流的支持
				.addLast(new ChunkedWriteHandler())
				//添加聚合器
				.addLast(new HttpObjectAggregator(1024 * 64*64))
				//设置websocket连接前缀前缀
				//心跳检查(30秒)
				.addLast(new IdleStateHandler(30, 0, 0))
				//添加自定义处理器
				.addLast(new WebSocketServerProtocolHandler("/ws",null,true))
				.addLast(aiHandler);
	}
}
  • 创建AiHandler
@Component
@Slf4j
public class AiHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

	@Autowired
	private AiSession aiSession;

	@Autowired
	private AiService tongYiService;

	@Autowired
	private MongoTemplate mongoTemplate;

	@Override
	public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
		JSONObject jsonObject = JSON.parseObject(msg.text());
		//获取消息类型
		Object method = jsonObject.get("method");
		// 处理消息
		//ping
		if ("ping".equals(method)){
			LoginUser loginUser = AuthUtil.getLoginUser(jsonObject.get("Authorization").toString());
			if (Objects.isNull(loginUser)){
				//表明重新登陆
				AiResponse responseData = new AiResponse();
				responseData.setCode(10002);
				responseData.setValue("relogin");
				responseData.setMethod("error");
				ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(responseData)));
				return;
			}
			//返回ack,表示心跳正常
			aiSession.save(loginUser.getUserid(),ctx.channel());
			AiResponse responseData = new AiResponse();
			responseData.setValue(String.valueOf(System.currentTimeMillis()));
			responseData.setMethod("ack");
			ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(responseData)));
		}else if ("send".equals(method)){ //消息发送
			Long userId = aiSession.getUserIdByChannel(ctx.channel());
			if (Objects.nonNull(userId)){
				Object value = jsonObject.get("value");
				log.info("发送的内容------->{}",value);
				//请求大模型api
				R<String> result = tongYiService.handleAI(value.toString().trim(),userId);
				//封装回复消息
				String aiReponseText = result.getData();
				log.info("Ai回复的内容-------->{}",aiReponseText);
				AiResponse responseData = new AiResponse();
				responseData.setCode(200);
				responseData.setValue(aiReponseText);
				responseData.setMethod("response");
				//返回消息
				ChannelFuture channelFuture = ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(responseData)));
				channelFuture.addListener(listener->{
					if (listener.isSuccess()){
						//封装用户发送的消息
						AiChat userChat = new AiChat();
						userChat.setId(IdUtils.randomUUID());
						userChat.setShowText(value.toString());
						userChat.setIsUser(true);
						userChat.setText(value.toString());
						userChat.setTimestamp((Long) jsonObject.get("timestamp"));
						userChat.setUserId(userId);

						//封装ai回复消息
						AiChat aiChat = new AiChat();
						aiChat.setId(IdUtils.randomUUID());
						aiChat.setShowText(aiReponseText);
						aiChat.setText(aiReponseText);
						aiChat.setIsUser(false);
						aiChat.setTimestamp(System.currentTimeMillis());
						aiChat.setUserId(userId);
						//保存回复的消息
						mongoTemplate.insertAll(Arrays.asList(userChat,aiChat));
					}
				});
			}else{
				//重新登陆
				AiResponse responseData = new AiResponse();
				responseData.setCode(10002);
				responseData.setValue("relogin");
				responseData.setMethod("error");
				ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(responseData)));
			}
		}
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		// 添加连接
		System.out.println("新连接: " + ctx.channel().id().asShortText());
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		// 断开连接
		System.out.println("断开连接: " + ctx.channel().id().asShortText());
		aiSession.removeSessionByChannel(ctx.channel());
	}

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		if (evt instanceof IdleStateEvent) {
			IdleState state = ((IdleStateEvent) evt).state();
			if (state == IdleState.READER_IDLE) {
				log.info("{}---心跳超时--->{}", ctx.channel().id().asShortText(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
				ctx.channel().close();
			}
		} else {
			super.userEventTriggered(ctx, evt);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// 异常处理
		cause.printStackTrace();
//		ctx.close();
	}
}
  • 创建StartListener
@Component
public class StartListener implements ApplicationListener<ContextRefreshedEvent> {


	@Autowired
	private WebSocketNettyServer webSocketNettyServer;

	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		try {
			//启动netty服务
			webSocketNettyServer.start();
		} catch (Exception ignored) {

		}
	}
}

然后netty相关配置就搭建完成了,前端通过websocket请求路径ws://主机:6000/ws就可以连接到netty上来了,然后就可以通过websocket进行消息的发送和对回复的消息进推送了。

我使用的是uniapp搭建的小程序和app端,实测是可以的,PC的也肯定是可以

五、效果展示


http://www.kler.cn/a/569240.html

相关文章:

  • leetcode第40题组合总和Ⅱ
  • 安装Maven配置阿里云地址 详细教程
  • P8720 [蓝桥杯 2020 省 B2] 平面切分--set、pair
  • spring boot整合flyway实现数据的动态维护
  • 【FAQ】HarmonyOS SDK 闭源开放能力 —Push Kit(9)
  • 【无标题】Ubuntu22.04编译视觉十四讲slambook2 ch4时fmt库的报错
  • Apollo canbus 学习笔记
  • 【Jenkins】个人向-Jenkinsfile如何写
  • git 中 commit 的修改
  • Ubuntu 防火墙iptables和 ufw
  • C# 13与.NET 9革新及工业开发应用
  • 经验分享:用一张表解决并发冲突!数据库事务锁的核心实现逻辑
  • 2024年第十五届蓝桥杯大赛软件赛省赛Python大学A组真题解析《更新中》
  • 大白话面试遇难题,应对策略是什么?
  • LeetCode:132. 分割回文串 II(DP Java)
  • OpenCV计算摄影学(7)HDR成像之多帧图像对齐的类cv::AlignMTB
  • 选开源CMS建站系统时,插件越多越好吗?
  • Apache部署Vue操作手册(SSL部分)
  • 【Linux文件IO】系统IO详情
  • torch.einsum 的 10 个常见用法详解以及多头注意力实现