JAVA异步的UDP 通讯-服务端
1. 使用NIO实现非阻塞UDP通信
通过DatagramChannel
和Selector
,可以实现非阻塞的UDP通信,从而高效地处理多个客户端的请求。
示例代码:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
public class AsyncUDPServer {
public static void main(String[] args) throws IOException {
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);
while (selector.select() > 0) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.receive(buffer);
buffer.flip();
String message = new String(buffer.array(), 0, buffer.limit());
System.out.println("Received: " + message);
// 可以在此处处理消息并发送响应
String response = "Echo: " + message;
buffer.clear();
buffer.put(response.getBytes());
buffer.flip();
channel.send(buffer, key.channel().socket().getRemoteSocketAddress());
}
selector.selectedKeys().remove(key);
}
}
}
}
2. 设置超时和缓冲区大小
为了优化性能,可以设置接收超时时间以及调整接收和发送缓冲区的大小。
示例代码:
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
public class OptimizedUDPServer {
public static void main(String[] args) throws Exception {
DatagramSocket socket = new DatagramSocket(9898);
socket.setSoTimeout(5000); // 设置接收超时时间为5000毫秒
socket.setReceiveBufferSize(8192); // 设置接收缓冲区大小
socket.setSendBufferSize(8192); // 设置发送缓冲区大小
byte[] receiveBuffer = new byte[1024];
DatagramPacket receivePacket = new DatagramPacket(receiveBuffer, receiveBuffer.length);
while (true) {
try {
socket.receive(receivePacket);
String message = new String(receivePacket.getData(), 0, receivePacket.getLength());
System.out.println("Received: " + message);
// 发送响应
String response = "Echo: " + message;
DatagramPacket sendPacket = new DatagramPacket(response.getBytes(), response.getBytes().length,
receivePacket.getAddress(), receivePacket.getPort());
socket.send(sendPacket);
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}
}
3. 使用线程池处理请求
通过线程池可以高效地处理多个客户端的请求,避免阻塞主线程。
示例代码:
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadedUDPServer {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws Exception {
DatagramSocket socket = new DatagramSocket(9898);
while (true) {
byte[] receiveBuffer = new byte[1024];
DatagramPacket receivePacket = new DatagramPacket(receiveBuffer, receiveBuffer.length);
socket.receive(receivePacket);
executor.submit(() -> {
try {
String message = new String(receivePacket.getData(), 0, receivePacket.getLength());
System.out.println("Received: " + message);
// 发送响应
String response = "Echo: " + message;
DatagramPacket sendPacket = new DatagramPacket(response.getBytes(), response.getBytes().length,
receivePacket.getAddress(), receivePacket.getPort());
socket.send(sendPacket);
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
});
}
}
}
4. 异步处理和超时机制
在异步处理中,可以设置超时机制,以便在长时间未收到响应时进行处理。
示例代码:
import java.net.DatagramSocket;
import java.net.DatagramPacket;
import java.util.concurrent.*;
public class AsyncUDPServerWithTimeout {
public static void main(String[] args) throws Exception {
DatagramSocket socket = new DatagramSocket(9898);
ExecutorService executor = Executors.newSingleThreadExecutor();
while (true) {
byte[] receiveBuffer = new byte[1024];
DatagramPacket receivePacket = new DatagramPacket(receiveBuffer, receiveBuffer.length);
Future<String> future = executor.submit(() -> {
socket.receive(receivePacket);
return new String(receivePacket.getData(), 0, receivePacket.getLength());
});
try {
String message = future.get(5, TimeUnit.SECONDS); // 设置超时时间为5秒
System.out.println("Received: " + message);
// 发送响应
String response = "Echo: " + message;
DatagramPacket sendPacket = new DatagramPacket(response.getBytes(), response.getBytes().length,
receivePacket.getAddress(), receivePacket.getPort());
socket.send(sendPacket);
} catch (TimeoutException e) {
System.out.println("Timeout occurred. No data received within 5 seconds.");
}
}
}
}