async-http-client使用示例
文章目录
-
- 概要
- 整体架构流程
- 技术名词解释
- 技术细节
- 小结
概要
async-http-client是一个用于 Java 平台的高性能、非阻塞 HTTP 客户端库,它允许开发者以异步的方式发送 HTTP 请求并处理响应,从而提高应用程序的性能和响应性。
主要特点
-
异步处理:基于 Netty 框架实现,支持异步发送 HTTP 请求和处理响应,避免了传统同步请求中的阻塞。
-
支持多种请求方法:支持 GET、POST、PUT、DELETE 等常见的 HTTP 请求方法。
-
灵活的配置:通过
AsyncHttpClientConfig
类可以配置连接池、超时时间、代理服务器等。 -
多种响应处理方式:支持通过回调函数、
Future
或CompletableFuture
来处理异步响应。
项目准备
首先创建一个maven工程,引入一下依赖:
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>async-http-client</artifactId>
<version>1.9.40</version>
</dependency>
技术细节
创建一个工具类:
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.zip.GZIPInputStream;
public class EncodingUtil {
/**
* @param str
* @param newCharset
* @return
* @throws UnsupportedEncodingException
*/
public static String changeCharset(String str, String newCharset)
throws UnsupportedEncodingException {
if (str != null) {
// 用默认字符编码解码字符串。
byte[] bs = str.getBytes(Charset.defaultCharset());
// 用新的字符编码生成字符串
return new String(bs, newCharset);
}
return null;
}
/**
* @param str
* @param oldCharset
* @param newCharset
* @return
* @throws UnsupportedEncodingException
*/
public static String changeCharset(String str, String oldCharset,
String newCharset) throws UnsupportedEncodingException {
if (str != null) {
// 用旧的字符编码解码字符串。解码可能会出现异常。
byte[] bs = str.getBytes(oldCharset);
// 用新的字符编码生成字符串
return new String(bs, newCharset);
}
return null;
}
/**
* @param bytes
* @param oldCharset
* @param newCharset
* @return
* @throws UnsupportedEncodingException
*/
public static byte[] changeCharset(byte[] bytes, String oldCharset,
String newCharset) throws UnsupportedEncodingException {
byte[] b = null;
String s = null;
if (bytes != null) {//增加判断返回报文的编码格式为gzip时进行解压缩
if("gzip".equals(oldCharset)){
b = unGZip(bytes);
s = new String(b,"UTF-8");
}else{
s = new String(bytes, oldCharset);
}
// 用新的编码生成字节
return s.getBytes(newCharset);
}
return null;
}
/**
* @param bytes
* @param newCharset
* @return
* @throws UnsupportedEncodingException
*/
public static byte[] changeCharset(byte[] bytes,
String newCharset) throws UnsupportedEncodingException {
if (bytes != null) {
// 用缺省的编码解码字节。解码可能会出现异常。
String s = new String(bytes,Charset.defaultCharset());
// 用新的编码生成字节
return s.getBytes(newCharset);
}
return null;
}
/**
* @param str
* @param stringCharset
* @param newCharset
* @return
* @throws UnsupportedEncodingException
*/
public static byte[] convertByCharset(String str, String stringCharset,
String newCharset) throws UnsupportedEncodingException {
if (str != null) {
String newString = changeCharset(str, stringCharset, newCharset);
return newString.getBytes(newCharset);
}
return null;
}
/**
* @param bytes
* @param bytesCharset
* @param newCharset
* @return
* @throws UnsupportedEncodingException
*/
public static String convertByCharset(byte[] bytes, String bytesCharset,
String newCharset) throws UnsupportedEncodingException {
if (bytes != null) {
byte[] newBytes = changeCharset(bytes, bytesCharset, newCharset);
return new String(newBytes, newCharset);
}
return null;
}
/**
* 解压缩gzip格式数据
* @param data
* @return
*/
public static byte[] unGZip(byte[] data) {
byte[] b = null;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(data);
GZIPInputStream gzip = new GZIPInputStream(bis);
byte[] buf = new byte[data.length];
int num = -1;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((num = gzip.read(buf, 0, buf.length)) != -1) {
baos.write(buf, 0, num);
}
b = baos.toByteArray();
baos.flush();
baos.close();
gzip.close();
bis.close();
} catch (Exception ex) {
ex.printStackTrace();
}
return b;
}
}
创建一个发送类:
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Request;
import com.ning.http.client.AsyncHttpClientConfig.Builder;
import com.ning.http.client.providers.netty.NettyAsyncHttpProvider;
public class AsyncHttpClientTest {
public static final String COMMON_UTF8 = "utf-8";
public static void request(String requestUrl,String body,String method) {
//初始化客户端
Builder builder = new AsyncHttpClientConfig.Builder().setAllowPoolingConnections(true).setAllowPoolingSslConnections(true).setAcceptAnyCertificate(true);
builder.setConnectTimeout(600000).setReadTimeout(660000)
.setRequestTimeout(660000)
.setMaxRequestRetry(0)
.setMaxConnections(500)
.setMaxConnectionsPerHost(500);
AsyncHttpClientConfig aconfig = builder.build();
AsyncHttpClient httpClient = new AsyncHttpClient(new NettyAsyncHttpProvider(aconfig), aconfig);
//设置请求方式和编码
String requestEncoding="gbk";
String currentEncoding="gbk";
BoundRequestBuilder requestBuilder = null;
try {
if (method.equalsIgnoreCase("GET")) {
requestBuilder = httpClient.prepareGet(requestUrl);
} else if (method.equalsIgnoreCase("POST")) {
requestBuilder = httpClient.preparePost(requestUrl);
byte[] requestBody = createBody(body, requestEncoding, currentEncoding);
requestBuilder.setBody(requestBody);
} else if (method.equalsIgnoreCase("PUT")) {
requestBuilder = httpClient.preparePut(requestUrl);
byte[] requestBody = createBody(body, requestEncoding, currentEncoding);
requestBuilder.setBody(requestBody);
} else if (method.equalsIgnoreCase("DELETE")) {
requestBuilder = httpClient.prepareDelete(requestUrl);
byte[] requestBody = createBody(body, requestEncoding, currentEncoding);
requestBuilder.setBody(requestBody);
}
//设置请求头
requestBuilder.addHeader("User-Agent", "PostmanRuntime/7.37.3");
requestBuilder.addHeader("Accept", "*/*");
requestBuilder.addHeader("Connection", "keep-alive");
requestBuilder.addHeader("Cache-Control", "no-cache");
requestBuilder.addHeader("Content-Encoding", requestEncoding);
requestBuilder.addHeader("Content-Type", "text/html; charset="+requestEncoding);
requestBuilder.addHeader("X-Gov-Signature", "4c22d25b44eb437f9c6358e927838a22");
//发送请求
Request request = requestBuilder.build();
AsyncCompletionHandler completionHandler = new AsyncRequestResultHandler(httpClient, request, 3);
httpClient.executeRequest(request, completionHandler);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) {
//发送目标请求地址
String requestUrl = "http://www.webxml.com.cn/WebServices/weatherwebService.asmx?wsdl";
requestUrl="http://localhost:8081/sleepdemo/SleepServiet";
requestUrl="http://192.168.17.15:10010/encoding/utf-8";
requestUrl="http://192.168.17.15:10001/testencoding";
String body="测试";
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("test.txt"),"GB2312"));
String line;
StringBuffer buffer=new StringBuffer();
while ((line = reader.readLine()) != null) {
buffer.append(line);
}
reader.close();
body=buffer.toString();
} catch (IOException e) {
e.printStackTrace();
}
String method="POST";
request(requestUrl,body,method);
}
/**
* 创建消息体
* @param body
* @param requestEncoding
* @param currentEncoding
* @return
* @throws UnsupportedEncodingException
*/
public static byte[] createBody(Object body, String requestEncoding, String currentEncoding)
throws UnsupportedEncodingException {
String oldEncoding = currentEncoding != null ? currentEncoding : COMMON_UTF8;
try {
if (body instanceof String) {
if (requestEncoding.equalsIgnoreCase(oldEncoding)) {
return ((String) body).getBytes(requestEncoding);
} else {
byte[] bytes = EncodingUtil.convertByCharset((String) body, oldEncoding, requestEncoding);
return bytes;
}
} else if (body instanceof byte[]) {
if (requestEncoding.equalsIgnoreCase(oldEncoding)) {
return (byte[]) body;
} else {
byte[] bytes = EncodingUtil.changeCharset((byte[]) body, oldEncoding, requestEncoding);
return bytes;
}
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (byte[]) body;
}
public static String getUTF8StringFromGBKString(String gbkStr) {
try {
return new String(getUTF8BytesFromGBKString(gbkStr), "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new InternalError();
}
}
public static byte[] getUTF8BytesFromGBKString(String gbkStr) {
int n = gbkStr.length();
byte[] utfBytes = new byte[3 * n];
int k = 0;
for (int i = 0; i < n; i++) {
int m = gbkStr.charAt(i);
if (m < 128 && m >= 0) {
utfBytes[k++] = (byte) m;
continue;
}
utfBytes[k++] = (byte) (0xe0 | (m >> 12));
utfBytes[k++] = (byte) (0x80 | ((m >> 6) & 0x3f));
utfBytes[k++] = (byte) (0x80 | (m & 0x3f));
}
if (k < utfBytes.length) {
byte[] tmp = new byte[k];
System.arraycopy(utfBytes, 0, tmp, 0, k);
return tmp;
}
return utfBytes;
}
}
这个是返回请求的接收:
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.Request;
import com.ning.http.client.Response;
/**
* 描述:异步请求结果处理器
*
*
**/
public class AsyncRequestResultHandler extends AsyncCompletionHandler {
private AsyncHttpClient asyncHttpClient;
private Request request;
private int retryTimes;
private int currentRetryTimes = 0;
private static String TIMEOUT_STATUS = "408";
public AsyncRequestResultHandler(AsyncHttpClient asyncHttpClient, Request request, int retryTimes){
this.asyncHttpClient = asyncHttpClient;
this.request = request;
this.retryTimes = retryTimes;
}
/**
* 响应异常
*/
public void onThrowable(Throwable t) {
t.printStackTrace();
if(!Thread.currentThread().isInterrupted()) {
if (currentRetryTimes < retryTimes) {
currentRetryTimes++;
System.out.println("current retry times: "+currentRetryTimes+", max retry times: "+retryTimes);
try {
asyncHttpClient.executeRequest(request, this);
} catch (Exception e) {
System.err.println("retry execute request error");
}
return;
}
}
System.exit(0);
}
/**
* 响应返回
*/
public Object onCompleted(Response response) throws Exception {
//获取响应报文内容
//String[] headerNames = response.getHeaders().keySet().toArray(new String[0]);
byte[] responseBody;
String contentEncoding = "GBK";
if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) {
try {
responseBody = IOUtils.toByteArray(new GZIPInputStream(response.getResponseBodyAsStream()));
}catch (Exception e){
responseBody = IOUtils.toByteArray(response.getResponseBodyAsStream());
}
} else {
responseBody = IOUtils.toByteArray(response.getResponseBodyAsStream());
}
//System.out.println("返回内容utf-8:"+new String(responseBody,"utf-8"));
String body=new String(responseBody,"GBK");
System.out.println("返回内容:"+body);
System.exit(0);
return responseBody;
}
}
小结
本文通过实现一个基于 async-http-client
的异步 HTTP 请求工具,展示了如何高效地发送 HTTP 请求并处理响应数据。通过工具类 EncodingUtil
和异步请求结果处理器 AsyncRequestResultHandler
,解决了字符编码和响应数据处理的问题,提高了程序的健壮性和易用性。