dpdk的基础使用-抓包
目录
1.运行dpdk必要条件
1.1.内核版本要求
1.2.glibc版本要求
1.3.大页内存要求
1.4. 网卡驱动绑定要求
2.简单的示例程序
2.1.hello world示例程序
2.2.关键的接口
2.2.1.rte_eal_init()
2.2.2.rte_eal_mp_remote_launch()
2.2.3.rte_eal_wait_lcore()
2.2.4.rte_eal_cleanup()
2.3.运行效果
3.dpdk抓包程序
3.1.关键的接口
3.1.1.rte_eth_dev_count_avail()
3.1.2.rte_pktmbuf_pool_create()
3.1.3.rte_eth_dev_configure()
3.1.4.rte_eth_dev_adjust_nb_rx_tx_desc()
3.1.5.rte_eth_rx_queue_setup()
3.1.6.rte_eth_tx_queue_setup()
3.1.7.rte_eth_dev_start()
3.1.8.rte_eth_promiscuous_enable()
3.1.9.rte_eth_rx_burst()
3.2.步骤总结
3.3.运行结果
1.运行dpdk必要条件
在dpdk官网手册中已经明确了运行dpdk程序所需要的条件,链接如下:
System Requirements — Data Plane Development Kit 25.03.0-rc0 documentationhttp://doc.dpdk.org/guides/linux_gsg/sys_reqs.html
1.1.内核版本要求
内核版本至少 >= 4.19,内核版本可以使用 uname -r 命令查看。
1.2.glibc版本要求
glibc版本至少 >= 2.7,具体版本可以使用 ldd --version 命令查看。
1.3.大页内存要求
关于大页内存的配置可以使用dpdk官网提供的dpdk-hugepages.py脚本进行设置,也可以使用命令设置大页内存,设置完成后使用命令 cat /proc/meminfo | grep Huge 应该可以看到具体的大页内存信息。
1.4. 网卡驱动绑定要求
如果需要使用dpdk抓包,则还需要将对应的网卡pcie地址绑定到igb_uio驱动上,使用命令 lspci | grep Eth 可以看到当前设备所有的pcie地址,然后使用dpdk官网提供的dpdk-devbind.py脚本来绑定网卡。
注:在前面的章节提到了dpdk模式和内核模式的区别,网口正常情况下是出于内核态模式的,此时我们使用ifconfig命令可以看到网口;但是当我们需要使用dpdk抓包时,将其绑定到igb_uio驱动,此时网口被dpdk接管,无法使用ifconfig命令看到网口,需要使用dpdk-devbind.py脚本查看,命令:./dpdk-devbind.py -s。(mlnx网卡较为特殊,不需要执行此步骤)
可以看到,有6个网口被dpdk接管,有1个网口mgmt还是使用的内核驱动。
2.简单的示例程序
其实dpdk本身在examples目录实现了很多dpdk的示例程序,方便大家学习参考。
2.1.hello world示例程序
这里简单写了一个示例程序,供大家学习参考:
源码如下:
#include <stdio.h>
#include <unistd.h>
#include <syslog.h>
#include <signal.h>
#include <pthread.h>
#include <bits/pthreadtypes.h>
#include "rte_lcore.h"
#include "rte_eal.h"
#include "rte_ethdev.h"
#include "rte_malloc.h"
void lcore_main_loop() {
while (1) {
printf("Hello world!, This is main lcore:%u!\n", rte_lcore_id());
sleep(1);
}
}
void lcore_rx_loop() {
while (1) {
printf("Hello world!, This is rx lcore:%u!\n", rte_lcore_id());
sleep(1);
}
}
/* Launch a function on lcore. 8< */
static int every_lcore_loop(void *arg) {
uint16_t lcore_id = rte_lcore_id();
if (lcore_id == rte_get_main_lcore()) {
lcore_main_loop();
}else {
lcore_rx_loop();
}
return 0;
}
/* >8 End of launching function on lcore. */
/* Initialization of Environment Abstraction Layer (EAL). 8< */
int main(int argc, char **argv) {
int ret;
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_panic("Cannot init EAL\n");
printf("Total core num:%u\n", rte_lcore_count());
/* Launches the function on each lcore. 8< */
rte_eal_mp_remote_launch(every_lcore_loop, NULL, CALL_MAIN);
/* >8 End of launching the function on each lcore. */
unsigned char lcore_id = 0;
RTE_LCORE_FOREACH_WORKER(lcore_id) {
if (rte_eal_wait_lcore(lcore_id) < 0) {
break;
}
}
/* clean up the EAL */
rte_eal_cleanup();
return 0;
}
2.2.关键的接口
2.2.1.rte_eal_init()
初始化平台的接口,主要作用是负责解析运行参数、设置大页内存、设置cpu亲和性、为每一个核心创建独立的线程、扫描pcie网卡设备等。
注:有兴趣的同学可以去看源码文件在 eal.c 中。
2.2.2.rte_eal_mp_remote_launch()
这个函数是dpdk中比较关键的部分,它为每一个核心提供函数入口,每一个逻辑核心都会回调此接口,可以类似的理解成创建线程的概念pthread_create()函数。
通过这个接口我们便可以根据传入的运行参数,来分配每一个核心需要做的事情,例如代码中main-lcore打印主逻辑核的相关信息,而其他的核心打印自己的核id。
2.2.3.rte_eal_wait_lcore()
可以理解为等待每一个核心做完自己的事情然后结束,类似线程中pthread_join()的概念。
2.2.4.rte_eal_cleanup()
主要用于清理eal初始化平台所占用的资源等。
2.3.运行效果
运行命令:./main~ -l 0,1,2 --main-lcore 0
注:更多的运行参数可以使用 --help 进行查看
3.dpdk抓包程序
代码:
#include <stdio.h>
#include <unistd.h>
#include <syslog.h>
#include <signal.h>
#include <pthread.h>
#include <bits/pthreadtypes.h>
#include <mylist.h>
#include <mynet.h>
#include <hs.h>
#include "rte_lcore.h"
#include "rte_eal.h"
#include "rte_ethdev.h"
#include "rte_malloc.h"
/* Per-port statistics struct */
struct dpdk_port_statistics {
uint64_t tx;
uint64_t rx;
uint64_t dropped;
} __rte_cache_aligned;
static struct rte_eth_conf port_conf = {
.rxmode = {
.split_hdr_size = 0,
},
.txmode = {
.mq_mode = ETH_MQ_TX_NONE,
},
};
#define false 0
#define true 1
#define MAX_PKT_BURST 32
#define MAX_DEV_PORTID 8
#define MEMPOOL_CACHE_SIZE 256
#define RX_QUEUE_NUM 1
#define TX_QUEUE_NUM 1
#define RX_QUEUE_ID 0
static uint16 nb_rxd = 1024;
static uint16 nb_txd = 1024;
struct rte_mempool *rx_pktmbuf_pool = NULL;
/* 网口流量统计信息 */
struct dpdk_port_statistics port_statistics[RTE_MAX_ETHPORTS];
/* 网口状态信息 */
uint8 link_port[RTE_MAX_ETHPORTS];
/* 网口running数量 */
uint8 link_port_num;
/* 运行状态 信号使用 */
static uint8 run_mode = true;
void clear_screen() {
const char clr[] = { 27, '[', '2', 'J', '\0' };
const char topLeft[] = { 27, '[', '1', ';', '1', 'H','\0' };
/* Clear screen and move to top left */
printf("%s%s", clr, topLeft);
}
void print_nic_statistics() {
if (!run_mode) {
return;
}
uint64 total_packets_dropped, total_packets_tx, total_packets_rx;
total_packets_dropped = 0;
total_packets_tx = 0;
total_packets_rx = 0;
printf("\nPort statistics ====================================");
for (uint16 port_index = 0; port_index < link_port_num; port_index++) {
if (link_port[port_index] == 0xff) {
continue;
}
char eth_name[PAYLOAD_LEN] = {0};
rte_eth_dev_get_name_by_port(link_port[port_index], eth_name);
printf("\nStatistics for port %u ----%s------------------------"
"\nPackets received: %lu"
"\nPackets dropped: %lu",
link_port[port_index],
eth_name,
port_statistics[link_port[port_index]].rx,
port_statistics[link_port[port_index]].dropped);
total_packets_dropped += port_statistics[link_port[port_index]].dropped;
total_packets_rx += port_statistics[link_port[port_index]].rx;
}
printf("\nAggregate statistics ==============================="
"\nTotal packets received: %llu"
"\nTotal packets dropped: %llu",
total_packets_rx,
total_packets_dropped);
printf("\n====================================================\n");
}
static void lcore_main_loop() {
uint64 prev_tsc = 0;
while (run_mode) {
uint64 cur_tsc = rte_rdtsc();
if ((cur_tsc - prev_tsc) > (uint64)10000000000) {
clear_screen();
print_nic_statistics();
prev_tsc = cur_tsc;
}
}
}
static void do_pkt_on_dpi(struct rte_mbuf *mbuf) {
//暂时不考虑带隧道的情况,也只考虑ipv4的报文
uint8 *pkt = NULL;
pkt = rte_pktmbuf_mtod(mbuf, uint8 *);
if (!pkt) {
return;
}
return;
}
static void lcore_rx_loop() {
struct rte_mbuf *pkts_burst[MAX_PKT_BURST] = {NULL};
struct rte_mbuf *m = NULL;
uint16 nb_rx = 0;
uint16 j = 0;
while (run_mode) {
for (uint16 port_index = 0; port_index < link_port_num; port_index++) {
nb_rx = rte_eth_rx_burst(link_port[port_index], RX_QUEUE_ID, pkts_burst, MAX_PKT_BURST);
port_statistics[link_port[port_index]].rx += nb_rx;
for (j = 0; j < nb_rx; j++) {
if (likely(j < nb_rx - 1)) {
rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[j + 1], void *));
}
m = pkts_burst[j];
/* 现在已经收到网卡的报文可以解析了---业务开始 */
if (m) {
do_pkt_on_dpi(m);
}
rte_pktmbuf_free(m);
}
}
}
}
static int every_lcore_loop(void *arg) {
uint16 lcore_id = rte_lcore_id();
/**
*目前只有两个核心,0核作为控制核负责打印网口流量等统计信息 1核负责抓包解析
*
**/
if (lcore_id == rte_get_main_lcore()) {
lcore_main_loop();
}else {
lcore_rx_loop();
}
return true;
}
uint8 find_eth_port() {
if (!rte_eth_dev_count_avail()) {
syslog(LOG_ERR, "No support eth device.");
return false;
}else {
syslog(LOG_INFO, "Find eth device num: %u.\n", rte_eth_dev_count_avail());
return true;
}
}
static void signal_handler(int signum) {
if (signum == SIGINT || signum == SIGTERM || signum == SIGTTIN || signum == SIGTTOU) {
printf("\n\nSignal %d received, preparing to exit...\n", signum);
syslog(LOG_INFO, "Signal %d received, preparing to exit...", signum);
run_mode = false;
}
}
void check_port_link_status() {
if (!run_mode) {
return;
}
syslog(LOG_INFO, "Check port link status.");
uint8 portid = 0;
uint8 index = 0;
struct rte_eth_link link;
RTE_ETH_FOREACH_DEV(portid) {
if (portid > MAX_DEV_PORTID) {
break;
}
memset(&link, 0, sizeof(struct rte_eth_link));
rte_eth_link_get_nowait(portid, &link);
if (link.link_status) {
link_port_num++;
link_port[index++] = portid;
syslog(LOG_INFO, "Port %d Link Up. Speed %u Mbps - %s\n", portid, link.link_speed, (link.link_duplex == ETH_LINK_FULL_DUPLEX) ? ("full-duplex") : ("half-duplex\n"));
}else {
syslog(LOG_INFO, "Port %d Link Down\n", portid);
}
}
}
void init_log_module() {
openlog("app_dpdk", LOG_PID | LOG_CONS, LOG_USER);
syslog(LOG_INFO, "The program app_dpdk is running.");
return;
}
int main(int argc, char *argv[]) {
/* 初始化平台 */
int ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
/* 注册日志模块 日志文件名/data/log/rsyslog/syslog.log */
init_log_module();
/* 检查当前网口数 */
if (!find_eth_port()) {
rte_exit(EXIT_FAILURE, "No support eth device.\n");
}
uint32 nb_mbufs = rte_eth_dev_count_avail() * (nb_rxd + nb_txd + MAX_PKT_BURST + rte_lcore_count() * MEMPOOL_CACHE_SIZE);
rx_pktmbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", nb_mbufs, MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, SOCKET_ID_ANY);
if (rx_pktmbuf_pool == NULL) {
syslog(LOG_ERR, "Cannot init mbuf pool.");
rte_exit(EXIT_FAILURE, "Cannot init mbuf pool\n");
}
/* 显示当前运行核数 */
syslog(LOG_INFO, "The program app_dpdk running core nums: %u, main-lcore-id:%u", rte_lcore_count(), rte_get_main_lcore());
/* 目前的想法是单核抓所有的网口 单核处理 流水线模型 */
uint8 portid = 0;
RTE_ETH_FOREACH_DEV(portid) {
struct rte_eth_rxconf rxq_conf;
struct rte_eth_txconf txq_conf;
struct rte_eth_conf local_port_conf = port_conf;
struct rte_eth_dev_info dev_info;
if (portid > MAX_DEV_PORTID) {
break;
}
/* init port */
syslog(LOG_INFO, "Initializing port %u... ", portid);
fflush(stdout);
rte_eth_dev_info_get(portid, &dev_info);
if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
local_port_conf.txmode.offloads |=
DEV_TX_OFFLOAD_MBUF_FAST_FREE;
ret = rte_eth_dev_configure(portid, RX_QUEUE_NUM, TX_QUEUE_NUM, &local_port_conf);
if (ret < 0) {
syslog(LOG_ERR, "Cannot configure device: err=%d, port=%u\n", ret, portid);
rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u\n", ret, portid);
}
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
if (ret < 0) {
syslog(LOG_ERR, "Cannot adjust number of descriptors: err=%d, port=%u\n", ret, portid);
rte_exit(EXIT_FAILURE, "Cannot adjust number of descriptors: err=%d, port=%u\n", ret, portid);
}
/* init one RX queue */
fflush(stdout);
rxq_conf = dev_info.default_rxconf;
rxq_conf.offloads = local_port_conf.rxmode.offloads;
ret = rte_eth_rx_queue_setup(portid, RX_QUEUE_ID, nb_rxd,
SOCKET_ID_ANY,
&rxq_conf,
rx_pktmbuf_pool);
if (ret < 0) {
syslog(LOG_ERR, "rte_eth_rx_queue_setup:err=%d, port=%u\n", ret, portid);
rte_exit(EXIT_FAILURE, "rte_eth_rx_queue_setup:err=%d, port=%u\n", ret, portid);
}
/* init one TX queue on each port */
fflush(stdout);
txq_conf = dev_info.default_txconf;
txq_conf.offloads = local_port_conf.txmode.offloads;
ret = rte_eth_tx_queue_setup(portid, RX_QUEUE_ID, nb_txd, SOCKET_ID_ANY, &txq_conf);
if (ret < 0)
rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u\n",
ret, portid);
/* Start device */
ret = rte_eth_dev_start(portid);
if (ret < 0)
rte_exit(EXIT_FAILURE, "rte_eth_dev_start:err=%d, port=%u\n", ret, portid);
syslog(LOG_INFO, "Init port: %u success.", portid);
rte_eth_promiscuous_enable(portid);
/* initialize port stats */
memset(&port_statistics, 0, sizeof(port_statistics));
}
/* 睡眠一会等待网口running */
sleep(3);
/* 检查当前running的网口 后续只抓running的 */
memset(link_port, 0xff, sizeof(link_port));
check_port_link_status();
for (uint8 i = 0; i < RTE_MAX_ETHPORTS; i++) {
if (link_port[i] != 0xff) {
syslog(LOG_INFO, "Running port, portid: %u.", link_port[i]);
}
}
syslog(LOG_INFO, "The program app_dpdk is running......");
/* 主函数回调 */
rte_eal_mp_remote_launch(every_lcore_loop, NULL, CALL_MAIN);
uint8 lcore_id = 0;
RTE_LCORE_FOREACH_WORKER(lcore_id) {
if (rte_eal_wait_lcore(lcore_id) < 0) {
break;
}
}
RTE_ETH_FOREACH_DEV(portid) {
syslog(LOG_INFO, "Closing port %u...", portid);
rte_eth_dev_stop(portid);
rte_eth_dev_close(portid);
}
rte_eal_cleanup();
syslog(LOG_INFO, "The program app_dpdk is stopped.");
closelog();
return 0;
}
代码中控制主核负责打印网口流量统计信息,其余核心负责接收和处理网卡队列上的报文。目前示例程序为每一个网卡只设置了一个接收队列,因此在抓包时,只需要轮询抓一个队列上的报文即可,实际上一般不止一个队列。
3.1.关键的接口
具体用法也可以参考dpdk提供的api手册,链接如下:
DPDK: APIhttp://doc.dpdk.org/api/
3.1.1.rte_eth_dev_count_avail()
用于获取当前设备被dpdk驱动接管的网口数量,很明显网口数量小于1则无法抓包。
3.1.2.rte_pktmbuf_pool_create()
主要用于创建报文缓存池,在网络数据包处理应用中,报文缓存池用于存储和管理接收到的报文。该函数主要涉及 网卡与 CPU 内存之间的 DMA(Direct Memory Access)传输,从而提高数据处理效率。
其申请节点数量计算方式为,抓包网口数量×(每一个网口队列所需要分配的文件描述符(mbuf指针) × 网口队列数量 + 每个核心的报文缓存大小)。
3.1.3.rte_eth_dev_configure()
设置网口队列信息,包括设置网口的队列数量(最大支持的队列数取决于硬件特性),网口的rss哈希算法等。
3.1.4.rte_eth_dev_adjust_nb_rx_tx_desc()
校验当前网口是否支持这么多队列数量,如果超出硬件最大支持的数量则报错。
3.1.5.rte_eth_rx_queue_setup()
为网口设置每一个接收队列,有多少队列则需要调用多少次,主要是为每一个队列分配报文缓冲池内存。
3.1.6.rte_eth_tx_queue_setup()
为网口设置每一个发送队列,有多少队列则需要调用多少次,主要是为每一个队列分配报文缓冲池内存。
3.1.7.rte_eth_dev_start()
启动该网口,
3.1.8.rte_eth_promiscuous_enable()
开启混杂模式。
3.1.9.rte_eth_rx_burst()
将网卡队列中的接收报文,并填充到mbuf指针中。此时获取到报文之后就可以对报文进行处理了。
很明显这是一个流水线模型,抓包之后立马处理,然后在处理下一个报文,每一个处理核都是同样的任务,实际上可以根据需要调整。
3.2.步骤总结
设置报文缓存池---->设置网卡信息---->校验网卡队列数是否合法---->分配收发队列---->开启混杂模式---->启动网卡---->收包处理
3.3.运行结果
运行命令:./main~ -l 0,1 --main-lcore 0
4.总结
通过上述例子,大家也可以对其进行改造从而实现自己的业务和功能。