编码技巧——基于Socket通信的接口调用
背景
在新公司项目开发,发现内部服务之间竟然有基于Socket通信的"上古"接口,它甚至都不是Http,难找到一些类似Apache的工具包提供好用的操作api,所以需要自己实现;以下是代码示例;
代码
(1)配置property文件
sms_config.properties
# 短信发送相关配置
# 接口鉴权账号
sms.account=62340C4CC23F46C0915E492E39473492
# 接口鉴权token
sms.token=915e46a0cdff4f33824ce69e7b8056b4
# socket参数
sms.socket.maxMsgPackLength=2048
sms.socket.socTimeOut=500
sms.socket.connect.timeOut=1000
# 编码集
sms.socket.encoding=GBK
# 发送文本格式
sms.sendStr.format=%08d
# 短信接口地址端口 测试用localhost启动server来mock
sms.socket.serverIP=127.0.0.1
sms.socket.serverPort=29999
(2)Socket Server服务mock
/**
* mock socket server
*/
public class TestSms {
@Test
public void testServer() {
// 启动socketServer
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
// 初始化
ServerSocket serverSocket = new ServerSocket(29999);
System.out.println("socket server 启动成功!");
//noinspection InfiniteLoopStatement
while (true) {
Socket accept = serverSocket.accept();
System.out.println(accept.getInetAddress().getHostAddress());
executorService.execute((new CustomRunnable(accept)));
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 用来处理客户端请求的线程
*/
class CustomRunnable implements Runnable {
private Socket socket;
public CustomRunnable(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
byte[] buffer = new byte[2048];
InputStream is = socket.getInputStream();
is.read(buffer, 0, buffer.length);
String gbkStr = new String(buffer, "GBK");
System.out.println("接收到client " + socket.getInetAddress() + " port " + socket.getPort() + " msg:" + gbkStr);
socket.shutdownInput();
OutputStream outputStream = socket.getOutputStream();
String respStr = JSON.toJSONString(FacadeDefaultDTO.success(null));
String respEncode = String.format("%08d", respStr.getBytes("GBK").length) + respStr;
byte[] gbks = respEncode.getBytes("GBK");
outputStream.write(gbks, 0, gbks.length);
// socket.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
(3)调用方示例
/**
* 基于socket的短信服务
*/
@Slf4j
@Service
@PropertySource({"classpath:META-INF/sms_config.properties"})
public class SmsServiceImpl implements SmsService {
@Value("${sms.account}")
private String smsAccount;
@Value("${sms.token}")
private String smsToken;
@Value("${sms.socket.maxMsgPackLength}")
private int maxMsgPackLength;
@Value("${sms.socket.encoding}")
private String encoding;
@Value("${sms.socket.serverIP}")
private String serverIP;
@Value("${sms.socket.serverPort}")
private int serverPort;
@Value("${sms.socket.socTimeOut}")
private int socTimeOut;
@Value("${sms.socket.connect.timeOut}")
private int connTimeout;
@Value("${sms.sendStr.format}")
private String sendStrFormat;
/**
* 向指定手机号发送微信消息
*
* @param phoneNumber
* @param content
* @return
*/
@Override
public boolean sendWechatSms(String phoneNumber, String content) {
String sendNo = generateSendNo(phoneNumber, content, LocalDateTime.now());
return sendWechatSms(phoneNumber, content, sendNo);
}
/**
* 生成发送流水号 用于幂等或疲劳度控制
*
* @param mobile
* @param content
* @param now
* @return
*/
@Override
public String generateSendNo(String mobile, String content, LocalDateTime now) {
String md5short = MD5Utils.encode32(content).substring(0, Math.min(content.length(), 5));
String minuteTime = DateUtils.date2String(DateUtils.getNowAccurate2minute(now), DateUtils.DATE_FORMAT_7);
return String.join(CacheKeyUtils.SEPARATOR, mobile, minuteTime, md5short);
}
/**
* 向指定手机号发送微信消息
*
* @param phoneNumber
* @param content
* @param sendNo 可用于幂等
* @return
*/
@Override
public boolean sendWechatSms(String phoneNumber, String content, String sendNo) {
Map<String, Object> paramsMap = Maps.newLinkedHashMap();
paramsMap.put("account", smsAccount);
paramsMap.put("token", smsToken);
paramsMap.put("mobile", phoneNumber);
paramsMap.put("tempid", sendNo);
paramsMap.put("content", content);
String reqStr = JSON.toJSONString(paramsMap);
log.info("smsService#send. [req={}]", reqStr);
String respStr = sendBySocket(reqStr);
if (StringUtils.isBlank(respStr)) {
log.error("发送微信短信失败 Socket请求返回为null! [req={} resp={}]", reqStr, respStr);
return false;
}
FacadeDefaultDTO<Object> facadeDefaultDTO = JSON.parseObject(respStr, new TypeReference<FacadeDefaultDTO<Object>>() {
});
boolean success = facadeDefaultDTO.isSuccess();
if (success) {
log.warn("smsService#send_suc. [respDTO={} respStr={}]", JSON.toJSONString(facadeDefaultDTO), respStr);
} else {
log.error("smsService#send_fail! [respDTO={} respStr={}]", JSON.toJSONString(facadeDefaultDTO), respStr);
}
return success;
}
/**
* 调用该方法发送短信
* 参数是按接口格式组装好的JSON串
* 返回null说明通讯异常,不为空时,根据返回串内容中相关标志判断是否发送成功
*/
private String sendBySocket(String inStr) {
String res = null;
Socket smsSocket = null;
byte[] recBuffer = new byte[maxMsgPackLength];
byte[] sendBuffer;
try {
// 编码
String sendStr = encodedSendStr(inStr);
// 建立服务连接
// smsSocket = new Socket(serverIP, serverPort); // 此方式不能设置建立连接超时时间
smsSocket = new Socket();
// 此方法支持设置建立连接超时时间
smsSocket.connect(new InetSocketAddress(serverIP, serverPort), connTimeout);
smsSocket.setSoTimeout(socTimeOut);
sendBuffer = sendStr.getBytes(encoding);
// 发送和接收
res = sendThenGet(smsSocket, sendBuffer, recBuffer);
} catch (Exception e) {
log.error("socket_sendThenGet_error!", e);
} finally {
try {
if (smsSocket != null) smsSocket.close();
} catch (IOException e1) {
log.error("socket_close_error!", e1);
}
}
return res;
}
/**
* 发送和接收 含返回结果解码
*
* @param socket
* @param sendBuffer
* @param recBuffer
* @return
*/
private String sendThenGet(Socket socket, byte[] sendBuffer, byte[] recBuffer) {
String respStr = null;
OutputStream os = null;
int reqLength = sendBuffer.length;
InputStream is = null;
int respLength = -1;
try {
os = socket.getOutputStream();
os.write(sendBuffer, 0, reqLength);
is = socket.getInputStream();
respLength = is.read(recBuffer, 0, 8);
if (respLength > 0) {
respLength += is.read(recBuffer, 8, recBuffer.length - 8);
respStr = new String(recBuffer, 8, respLength - 8, encoding);
}
// socket关闭输入输出流
socket.shutdownInput();
socket.shutdownOutput();
} catch (IOException ie) {
System.out.println("从socket读写数据异常!" + ie);
} finally {
try {
if (is != null) {
is.close();
}
if (os != null) {
os.close();
}
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return respStr;
}
/**
* 编码 加上长度前缀
*/
private String encodedSendStr(String inStr) {
try {
return String.format(sendStrFormat, inStr.getBytes(encoding).length) + inStr;
} catch (UnsupportedEncodingException e2) {
log.error("send_sendStrFormat_error!", e2);
throw new RuntimeException("encodedSendStr_error! [inStr=" + inStr + "]");
}
}
}
注意关闭io的顺序,如果提前在server关闭了io,可能导致client读取数据超时;所以建议是:谁读取,谁来关闭io;
以上,希望能帮助到你;
参考:
java Socket IO流关闭问题(java.net.SocketException: Socket is closed)
springboot中使用@Value读取配置文件 - 菜鸟学院