当前位置: 首页 > article >正文

Spring Integration SFTP集成

问题

需要通过Spring使用SFTP上传文件。

思路

集成Spring Integration SFTP进行文件上传。

步骤

pom.xml

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-sftp</artifactId>
    <version>6.3.3</version>
</dependency>

SftpConfig.java

package com.xxx.auth.config;

import com.xxx.auth.service.ICxxpAuditLogService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.sshd.sftp.client.SftpClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.integration.sftp.outbound.SftpMessageHandler;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.CollectionUtils;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

@RefreshScope
@Slf4j
@Configuration
public class SftpConfig {

    /**
     * log id key 调用的时候,通过消息传入进来的id数据key
     */
    public static final String LOG_ID = "log_id";

    @Value("${sftp.host:localhost}")
    private String host;

    @Value("${sftp.port:22}")
    private int port;

    @Value("${sftp.user:foo}")
    private String user;

    @Value("${sftp.password:foo}")
    private String password;

    @Value("${sftp.remote.directory:/}")
    private String sftpRemoteDirectory;

    @Value("${sftp.appCode:KCGLPT}")
    private String appCode;

    @Resource
    private ICxxpAuditLogService iCxxpAuditLogService;

    @Bean
    public MessageChannel sftpSuccessChannel() {
        return new DirectChannel(); // 成功的消息通道
    }


    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    @ServiceActivator(inputChannel = "toSftpChannel", adviceChain = "after")
    public MessageHandler handler() {

        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
        handler.setFileNameGenerator(message -> {
            // 获取当前时间
            LocalDateTime currentTime = LocalDateTime.now();
            // 定义时间格式
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("_yyyy_MM_dd_HH_mm_ss");
            // 将当前时间格式化为字符串
            String formattedTime = currentTime.format(formatter);
            // 定义上传文件名
            return String.format("%s%s.log", appCode, formattedTime);
        });
        return handler;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setPropagateEvaluationFailures(true);
        advice.setSuccessChannelName("sftpSuccessChannel"); // 设置成功通道的名称
        advice.setFailureChannelName("sftpFailureChannel"); // 设置上传失败通道名称
        return advice;
    }

    // 监听成功的消息通道
    @ServiceActivator(inputChannel = "sftpSuccessChannel")
    public void handleSftpSuccess(AdviceMessage<String> message) {
        // 从消息头中获取 日志id
        List<String> logIdListStr = (List<String>) message.getInputMessage().getHeaders().get(LOG_ID);
        if (CollectionUtils.isEmpty(logIdListStr)) {
            log.warn("处理上传成功的消息,日志 ID为空");
        } else {
            // TODO 上传文件完成的处理
            // 归档日志
            log.info("处理上传成功的消息,日志 ID: {}", String.join(", ", logIdListStr));
            iCxxpAuditLogService.complete(logIdListStr);
        }
    }

    // 监听失败的消息通道
    @ServiceActivator(inputChannel = "sftpFailureChannel")
    public void handleSftpFailure(ErrorMessage message) {
    	Throwable cause = message.getPayload();
    	log.error("上传SFTP错误原因:", cause);
    }

    @MessagingGateway
    public interface SftpGateway {

        @Gateway(requestChannel = "toSftpChannel")
        void sendToSftp(Message<String> message);

    }
}

这是配置SFTP上传文件的配置,下面主要是使用SftpGateway调用sendToSftp方法进行文件上传,这个方法也可以使用流,也可以使用File类,传入,这里直接使用字符串内容传入了。

使用上传

// 注入上传网关
	@Resource
    private SftpConfig.SftpGateway sftpGateway;
...
// sftp上传文件内容
String payload = "hello";
sftpGateway.sendToSftp(MessageBuilder.withPayload(payload)
                    .setHeader(LOG_ID, ids)
                    .build());

这里就是使用Spring Integration SFTP 上传文件内容。

总结

这里主要是配置Spring Integration SFTP完成SFTP上传,这里主要复杂的地方,就是这个消息链机制,有点让人晕,不过主要就是上传成功了又个专门消息处理,上传失败了又有另外一个消息处理。然后,就是在使用上传的时候,可以通过往消息头里面设置key来传数据。

参考

  • SFTP Adapters
  • Spring Integration: SFTP Upload Example Using Key-Based Authentication
  • SFTP Inbound Channel Adapter
  • SFTP Streaming Inbound Channel Adapter

http://www.kler.cn/a/299023.html

相关文章:

  • 2024年港澳台华侨生联考师范类院校录取情况来
  • Mybatis中使用MySql触发器报错:You have an error in your SQL syntax; ‘DELIMITER $$
  • imu相机EKF
  • 你好Python
  • Vue+element 回车查询页面刷新
  • OpenSSL 心脏滴血漏洞(CVE-2014-0160)
  • 前端框架有哪些?全面解析主流前端框架
  • 【H2O2|全栈】关于CSS(1)CSS基础(一)
  • Android13默认开启电池百分比数字显示Framework
  • 项目答辩总结
  • NISP 一级 | 3.3 网络安全防护与实践
  • 取指操作流程
  • JavaWeb案例-登录认证
  • 【MRI基础】回波序列长度-echo train length ETL概念
  • 基于python+大数据爬虫技术+数据可视化+Spark的电力能耗数据分析与可视化平台设计与实现
  • UnLua调用蓝图变量、动画、函数
  • CSP-J 算法基础 排序算法的基本概念
  • 【Rust练习】12.枚举
  • SLM561A​​系列 60V 10mA到50mA线性恒流LED驱动芯片 为智能家居照明注入新活力
  • 【C-实践】文件服务器(1.0)
  • 如何识别和防范跨站请求伪造(CSRF)?
  • 动手学深度学习(pytorch)学习记录27-深度卷积神经网络(AlexNet)[学习记录]
  • 智能指针怎么就智能了?
  • 【Qt网络编程基础】Tcp服务器和客户端(只支持一对一)
  • MybatisPlus的学习
  • 通过脚本监控MySQL是否正常启动