Java NIO 【处理消息边界】
方案一
客户端和服务端约定好传输的最大长度(缺点:容易造成资源浪费)
方案二
约定好分隔符,例如(\n),根据分隔符创建新的butebuffer
server
package com.yys.mes.demo3;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
@Slf4j
public class Server {
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n'){
int length = i + 1 - source.position();
ByteBuffer target = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
target.put(source.get());
}
target.flip();
String s = Charset.defaultCharset().decode(target).toString();
log.info("target:{}", s);
}
}
source.compact();
}
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.info("sscKey:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
log.info("key: {}", key);
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(4);
//attachment作为附件关联到SelectionKey
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.info("{}", sc);
log.info("scKey:{}", scKey);
} else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
//拿到SelectionKey 关联的attachment附件
ByteBuffer buffer = (ByteBuffer)key.attachment();
int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
if(read == -1) {
key.cancel();
} else {
split(buffer);
if (buffer.position() == buffer.limit()){
//扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
//替换附件
key.attach(newBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
}
}
}
}
}
}
client
package com.yys.mes.nio;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
@Slf4j
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1", 8080));
log.info("client start");
sc.write(Charset.defaultCharset().encode("hello123\n"));
log.info("client end");
}
}
方案三
根据长度分配bytebuffer
Todo: 明天写