Spring Integration SFTP集成
问题
需要通过Spring使用SFTP上传文件。
思路
集成Spring Integration SFTP进行文件上传。
步骤
pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
<version>6.3.3</version>
</dependency>
SftpConfig.java
package com.xxx.auth.config;
import com.xxx.auth.service.ICxxpAuditLogService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.sshd.sftp.client.SftpClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.integration.sftp.outbound.SftpMessageHandler;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
@RefreshScope
@Slf4j
@Configuration
public class SftpConfig {
/**
* log id key 调用的时候,通过消息传入进来的id数据key
*/
public static final String LOG_ID = "log_id";
@Value("${sftp.host:localhost}")
private String host;
@Value("${sftp.port:22}")
private int port;
@Value("${sftp.user:foo}")
private String user;
@Value("${sftp.password:foo}")
private String password;
@Value("${sftp.remote.directory:/}")
private String sftpRemoteDirectory;
@Value("${sftp.appCode:KCGLPT}")
private String appCode;
@Resource
private ICxxpAuditLogService iCxxpAuditLogService;
@Bean
public MessageChannel sftpSuccessChannel() {
return new DirectChannel(); // 成功的消息通道
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", adviceChain = "after")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
handler.setFileNameGenerator(message -> {
// 获取当前时间
LocalDateTime currentTime = LocalDateTime.now();
// 定义时间格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("_yyyy_MM_dd_HH_mm_ss");
// 将当前时间格式化为字符串
String formattedTime = currentTime.format(formatter);
// 定义上传文件名
return String.format("%s%s.log", appCode, formattedTime);
});
return handler;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setPropagateEvaluationFailures(true);
advice.setSuccessChannelName("sftpSuccessChannel"); // 设置成功通道的名称
advice.setFailureChannelName("sftpFailureChannel"); // 设置上传失败通道名称
return advice;
}
// 监听成功的消息通道
@ServiceActivator(inputChannel = "sftpSuccessChannel")
public void handleSftpSuccess(AdviceMessage<String> message) {
// 从消息头中获取 日志id
List<String> logIdListStr = (List<String>) message.getInputMessage().getHeaders().get(LOG_ID);
if (CollectionUtils.isEmpty(logIdListStr)) {
log.warn("处理上传成功的消息,日志 ID为空");
} else {
// TODO 上传文件完成的处理
// 归档日志
log.info("处理上传成功的消息,日志 ID: {}", String.join(", ", logIdListStr));
iCxxpAuditLogService.complete(logIdListStr);
}
}
// 监听失败的消息通道
@ServiceActivator(inputChannel = "sftpFailureChannel")
public void handleSftpFailure(ErrorMessage message) {
Throwable cause = message.getPayload();
log.error("上传SFTP错误原因:", cause);
}
@MessagingGateway
public interface SftpGateway {
@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(Message<String> message);
}
}
这是配置SFTP上传文件的配置,下面主要是使用SftpGateway调用sendToSftp方法进行文件上传,这个方法也可以使用流,也可以使用File类,传入,这里直接使用字符串内容传入了。
使用上传
// 注入上传网关
@Resource
private SftpConfig.SftpGateway sftpGateway;
...
// sftp上传文件内容
String payload = "hello";
sftpGateway.sendToSftp(MessageBuilder.withPayload(payload)
.setHeader(LOG_ID, ids)
.build());
这里就是使用Spring Integration SFTP 上传文件内容。
总结
这里主要是配置Spring Integration SFTP完成SFTP上传,这里主要复杂的地方,就是这个消息链机制,有点让人晕,不过主要就是上传成功了又个专门消息处理,上传失败了又有另外一个消息处理。然后,就是在使用上传的时候,可以通过往消息头里面设置key来传数据。
参考
- SFTP Adapters
- Spring Integration: SFTP Upload Example Using Key-Based Authentication
- SFTP Inbound Channel Adapter
- SFTP Streaming Inbound Channel Adapter