DPDK用户态协议栈-Tcp Posix API 1
和udp一样,我们需要实现和系统调用一样的接口来实现我们的tcp server
。先来看看我们之前写的unix_tcp使用了哪些接口,这边我加上两个系统调用,分别是接收数据和发送数据。
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <string.h>
int main(int argc, char* argv) {
int sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(9999);
bind(sock, (struct sockaddr*)&servaddr, sizeof(struct sockaddr));
listen(sock, 10);
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
char buffer[64] = {0};
int fd = accept(sock, (struct sockaddr*)&clientaddr, &len);
while (1) {
int nb_recv = recv(fd, buffer, 64, 0);
if (nb_recv > 0) {
printf("tcp recv : %s\n", buffer);
send(fd, buffer, nb_recv, 0);
memset(buffer, 0, 64);
}
}
return 0;
}
- socket
- bind
- listen
- accept
- recv
- send
tcp的posix实现
bitmap
static struct localhost* get_host_fromfd(int sockfd) {
struct localhost* htp = get_lhost_instance();
struct localhost* host;
for(host = htp; host != NULL; host = host->next) {
if (host->fd == sockfd) {
return host;
}
}
struct ln_tcp_stream* stream;
struct ln_tcp_table* table = get_tcp_table_instance();
for(stream = table->streams; stream != NULL; stream = stream->next) {
if (stream->fd == sockfd) {
return stream;
}
}
return NULL;
}
根据sockfd来搜索对应的控制块(tcp和udp同一个函数)。在之前的udp api实现的过程中实现了一个伪bitmap(后续会完善的)。现在加上了tcp,在函数中加上tcp控制块相关的遍历条件。
tcp server的最后两个状态
static int ln_tcp_handle_close_wait(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr) {
if (tcphdr->tcp_flags & RTE_TCP_FIN_FLAG) {
if (stream->status == LN_TCP_STATUS_CLOSE_WAIT) {
//
}
}
return 0;
}
static int ln_tcp_handle_last_ack(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr) {
if (tcphdr->tcp_flags & RTE_TCP_ACK_FLAG) {
if (stream->status == LN_TCP_STATUS_LAST_ACK) {
stream->status = LN_TCP_STATUS_CLOSED;
struct ln_tcp_table* table = get_tcp_table_instance();
LL_REMOVE(stream, table->streams);
table->count--;
rte_ring_free(stream->recvbuf);
rte_ring_free(stream->sendbuf);
rte_free(stream);
}
}
return 0;
}
修改一些之前的API
由于tcp和udp都有创建套接字,绑定等通用的部分,所以我们要在之前写的api上做一些增加和修改,让TCP和UDP都可以使用他们。
socket创建套接字
static struct localhost* get_host_fromip_port(uint32_t ip, uint16_t port, uint8_t proto) {
struct localhost* htp = get_lhost_instance();
struct localhost* host;
for(host = htp; host != NULL; host = host->next) {
if (host->localip == ip && host->localport == port && host->protocol == proto) {
return host;
}
}
return NULL;
}
int nsocket(__attribute__((unused)) int domain, int type, __attribute__((unused)) int protocol) {
int fd = get_fd_frombitmap();
if (type == SOCK_DGRAM) {
struct localhost* host = rte_malloc("localhost", sizeof(struct localhost), 0);
if (host == NULL) {
return -1;
}
memset(host, 0, sizeof(struct localhost));
host->fd = fd;
if (type == SOCK_DGRAM) {
host->protocol = IPPROTO_UDP;
}
host->recvbuf = rte_ring_create("recv buf", RING_SIZE, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
if (host->recvbuf == NULL) {
rte_free(host);
return -1;
}
host->sendbuf = rte_ring_create("send buf", RING_SIZE, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
if (host->sendbuf == NULL) {
rte_ring_free(host->recvbuf);
rte_free(host);
return -1;
}
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
rte_memcpy(&host->mutex, &blank_mutex, sizeof(pthread_mutex_t));
rte_memcpy(&host->cond, &blank_cond, sizeof(pthread_cond_t));
struct localhost* lhp = get_lhost_instance();
LL_ADD(host, lhp);
}
else if (type == SOCK_STREAM) {
struct ln_tcp_stream* stream = rte_malloc("ln_tcp_stream", sizeof(struct ln_tcp_stream), 0);
if (stream == NULL) {
return -1;
}
stream->fd = fd;
stream->proto = IPPROTO_TCP;
stream->next = stream->prev = NULL;
stream->recvbuf = rte_ring_create("tcp recv buf", RING_SIZE, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
if (stream->recvbuf == NULL) {
rte_free(stream);
return -1;
}
stream->sendbuf = rte_ring_create("tcp send buf", RING_SIZE, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
if (stream->sendbuf == NULL) {
rte_ring_free(stream->sendbuf);
rte_free(stream);
return -1;
}
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
rte_memcpy(&stream->cond, &blank_cond, sizeof(pthread_cond_t));
rte_memcpy(&stream->mutex, &blank_mutex, sizeof(pthread_mutex_t));
struct ln_tcp_table* table - get_tcp_table_instance();
LL_ADD(stream, table->streams);
table->count++;
}
return fd;
}
bind绑定
int nbind(int sockfd, const struct sockaddr *addr, __attribute__((unused)) socklen_t addrlen) {
void* hostinfo = get_host_fromfd(sockfd);
if (hostinfo == NULL) {
return -1;
}
struct localhost* host = (struct localhost*)hostinfo;
if (host->protocol == IPPROTO_UDP) {
if (host == NULL) {
return -1;
}
const struct sockaddr_in* laddr = (const struct sockaddr_in*)addr;
host->localport = laddr->sin_port;
rte_memcpy(&host->localip, &laddr->sin_addr.s_addr, sizeof(uint32_t));
rte_memcpy(host->localmac, nSrcMac, RTE_ETHER_ADDR_LEN);
}
else {
struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;
const struct sockaddr_in* laddr = (const struct sockaddr_in*)addr;
stream->dport = laddr->sin_port;
rte_memcpy(&stream->dip, &laddr->sin_addr.s_addr, sizeof(uint32_t));
rte_memcpy(&stream->localmac, nSrcMac, RTE_ETHER_ADDR_LEN);
stream->status = LN_TCP_STATUS_CLOSED;
}
return 0;
}
close
int nclose(int fd) {
void* hostinfo = get_host_fromfd(fd);
if (hostinfo == NULL) {
return -1;
}
struct localhost* host = (struct localhost*)hostinfo;
if (host->protocol == IPPROTO_UDP) {
struct localhost* lhp = get_lhost_instance();
LL_REMOVE(host, lhp);
if (host->recvbuf) {
rte_ring_free(host->recvbuf);
}
if (host->sendbuf) {
rte_ring_free(host->sendbuf);
}
rte_free(host);
set_fd_frombitmap(fd);
}
else if (host->protocol == IPPROTO_TCP){
struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;
if (stream->status != LN_TCP_STATUS_LISTEN) {
struct ln_tcp_fragment* fragment = rte_malloc("close frag", sizeof(struct ln_tcp_fragment), 0);
if (fragment == NULL) {
return -1;
}
memset(fragment, 0, sizeof(struct ln_tcp_stream));
fragment->sport = stream->dport;
fragment->dport = stream->sport;
fragment->acknum = stream->recv_next;
fragment->seqnum = stream->recv_next;
fragment->windows = LN_TCP_INITIAL_WINDOWS;
fragment->hdr_off = 0x50;
fragment->tcp_flags = RTE_TCP_FIN_FLAG | RTE_TCP_ACK_FLAG;
rte_ring_mp_enqueue(stream->sendbuf, (void*)fragment);
stream->status = LN_TCP_STATUS_LAST_ACK;
set_fd_frombitmap(fd);
}
else {
struct ln_tcp_table* tb = get_tcp_table_instance();
LL_REMOVE(stream, tb->streams);
rte_free(stream);
}
}
return 0;
}
tcp的专有API
listen监听
int nlisten(int sockfd, __attribute__((unused))int backlog) {
void* hostinfo = get_host_fromfd(sockfd);
if (hostinfo == NULL) {
return -1;
}
struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;
if (stream->proto == IPPROTO_TCP) {
stream->status = LN_TCP_STATUS_LISTEN;
}
return 0;
}
accept建立连接
int naccept(int sockfd, struct sockaddr *addr, __attribute__((unused))socklen_t *addrlen) {
void* hostinfo = get_host_fromfd(sockfd);
if (hostinfo == NULL) {
return -1;
}
struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;
if (stream->proto == IPPROTO_TCP) {
struct ln_tcp_stream* apt = NULL;
pthread_mutex_lock(&stream->mutex);
while ((apt = ln_get_accept_stream(stream->dport)) == NULL) {
pthread_cond_wait(&stream->cond, &stream->mutex);
}
pthread_mutex_unlock(&stream->mutex);
struct sockaddr_in* addri = (struct sockaddr_in*)addr;
addri->sin_port = apt->sport;
rte_memcpy(&addri->sin_addr.s_addr, &apt->sip, sizeof(uint32_t));
return apt->fd;
}
return -1;
}
参考资料:https://github.com/0voice