当前位置: 首页 > article >正文

dpdk的基础使用-抓包

目录

1.运行dpdk必要条件

1.1.内核版本要求

1.2.glibc版本要求

1.3.大页内存要求

1.4. 网卡驱动绑定要求

2.简单的示例程序

2.1.hello world示例程序

2.2.关键的接口

2.2.1.rte_eal_init()

2.2.2.rte_eal_mp_remote_launch()

2.2.3.rte_eal_wait_lcore()

2.2.4.rte_eal_cleanup()

2.3.运行效果

3.dpdk抓包程序

3.1.关键的接口

3.1.1.rte_eth_dev_count_avail()

3.1.2.rte_pktmbuf_pool_create()

3.1.3.rte_eth_dev_configure()

3.1.4.rte_eth_dev_adjust_nb_rx_tx_desc()

3.1.5.rte_eth_rx_queue_setup()

3.1.6.rte_eth_tx_queue_setup()

3.1.7.rte_eth_dev_start()

3.1.8.rte_eth_promiscuous_enable()

3.1.9.rte_eth_rx_burst()

3.2.步骤总结

3.3.运行结果

1.运行dpdk必要条件

        在dpdk官网手册中已经明确了运行dpdk程序所需要的条件,链接如下:
System Requirements — Data Plane Development Kit 25.03.0-rc0 documentationicon-default.png?t=O83Ahttp://doc.dpdk.org/guides/linux_gsg/sys_reqs.html

1.1.内核版本要求

        内核版本至少 >= 4.19,内核版本可以使用 uname -r 命令查看。

1.2.glibc版本要求

        glibc版本至少 >= 2.7,具体版本可以使用 ldd --version 命令查看。

1.3.大页内存要求

        关于大页内存的配置可以使用dpdk官网提供的dpdk-hugepages.py脚本进行设置,也可以使用命令设置大页内存,设置完成后使用命令 cat /proc/meminfo  | grep Huge 应该可以看到具体的大页内存信息。

1.4. 网卡驱动绑定要求

        如果需要使用dpdk抓包,则还需要将对应的网卡pcie地址绑定到igb_uio驱动上,使用命令 lspci | grep Eth 可以看到当前设备所有的pcie地址,然后使用dpdk官网提供的dpdk-devbind.py脚本来绑定网卡。

        注:在前面的章节提到了dpdk模式和内核模式的区别,网口正常情况下是出于内核态模式的,此时我们使用ifconfig命令可以看到网口;但是当我们需要使用dpdk抓包时,将其绑定到igb_uio驱动,此时网口被dpdk接管,无法使用ifconfig命令看到网口,需要使用dpdk-devbind.py脚本查看,命令:./dpdk-devbind.py -s。(mlnx网卡较为特殊,不需要执行此步骤)

        可以看到,有6个网口被dpdk接管,有1个网口mgmt还是使用的内核驱动。

2.简单的示例程序

        其实dpdk本身在examples目录实现了很多dpdk的示例程序,方便大家学习参考。

2.1.hello world示例程序

        这里简单写了一个示例程序,供大家学习参考:

源码如下:

#include <stdio.h>
#include <unistd.h>
#include <syslog.h>
#include <signal.h>
#include <pthread.h>
#include <bits/pthreadtypes.h>

#include "rte_lcore.h"
#include "rte_eal.h"
#include "rte_ethdev.h"
#include "rte_malloc.h"


void lcore_main_loop() {
    while (1) {
        printf("Hello world!, This is main lcore:%u!\n", rte_lcore_id());
        sleep(1);
    }
}

void lcore_rx_loop() {
    while (1) {
        printf("Hello world!, This is rx lcore:%u!\n", rte_lcore_id());
        sleep(1);
    }
}

/* Launch a function on lcore. 8< */
static int every_lcore_loop(void *arg) {
    uint16_t lcore_id = rte_lcore_id();
    if (lcore_id == rte_get_main_lcore()) {
        lcore_main_loop();
    }else {
        lcore_rx_loop();
    }
    return 0;
}

/* >8 End of launching function on lcore. */

/* Initialization of Environment Abstraction Layer (EAL). 8< */
int main(int argc, char **argv) {
    int ret;

    ret = rte_eal_init(argc, argv);
    if (ret < 0)
        rte_panic("Cannot init EAL\n");
    
    printf("Total core num:%u\n", rte_lcore_count());
    /* Launches the function on each lcore. 8< */
    
    rte_eal_mp_remote_launch(every_lcore_loop, NULL, CALL_MAIN);

    /* >8 End of launching the function on each lcore. */
    unsigned char lcore_id = 0;
    RTE_LCORE_FOREACH_WORKER(lcore_id) {
        if (rte_eal_wait_lcore(lcore_id) < 0) {
            break;
        }
    }

    /* clean up the EAL */
    rte_eal_cleanup();

    return 0;
}

2.2.关键的接口

2.2.1.rte_eal_init()

        初始化平台的接口,主要作用是负责解析运行参数、设置大页内存、设置cpu亲和性、为每一个核心创建独立的线程、扫描pcie网卡设备等。

        注:有兴趣的同学可以去看源码文件在 eal.c 中。

2.2.2.rte_eal_mp_remote_launch()

        这个函数是dpdk中比较关键的部分,它为每一个核心提供函数入口,每一个逻辑核心都会回调此接口,可以类似的理解成创建线程的概念pthread_create()函数。

        通过这个接口我们便可以根据传入的运行参数,来分配每一个核心需要做的事情,例如代码中main-lcore打印主逻辑核的相关信息,而其他的核心打印自己的核id。

2.2.3.rte_eal_wait_lcore()

        可以理解为等待每一个核心做完自己的事情然后结束,类似线程中pthread_join()的概念。

2.2.4.rte_eal_cleanup()

        主要用于清理eal初始化平台所占用的资源等。

2.3.运行效果

运行命令:./main~ -l 0,1,2 --main-lcore 0

注:更多的运行参数可以使用 --help 进行查看

3.dpdk抓包程序

代码:

#include <stdio.h>
#include <unistd.h>
#include <syslog.h>
#include <signal.h>
#include <pthread.h>
#include <bits/pthreadtypes.h>

#include <mylist.h>
#include <mynet.h>
#include <hs.h>

#include "rte_lcore.h"
#include "rte_eal.h"
#include "rte_ethdev.h"
#include "rte_malloc.h"

/* Per-port statistics struct */
struct dpdk_port_statistics {
    uint64_t tx;
    uint64_t rx;
    uint64_t dropped;
} __rte_cache_aligned;

static struct rte_eth_conf port_conf = {
    .rxmode = {
        .split_hdr_size = 0,
    },
    .txmode = {
        .mq_mode = ETH_MQ_TX_NONE,
    },
};

#define false   0
#define true    1

#define MAX_PKT_BURST       32
#define MAX_DEV_PORTID      8
#define MEMPOOL_CACHE_SIZE  256
#define RX_QUEUE_NUM        1
#define TX_QUEUE_NUM        1
#define RX_QUEUE_ID         0

static uint16 nb_rxd = 1024;
static uint16 nb_txd = 1024;

struct rte_mempool *rx_pktmbuf_pool = NULL;

/* 网口流量统计信息 */
struct dpdk_port_statistics port_statistics[RTE_MAX_ETHPORTS];
/* 网口状态信息 */
uint8 link_port[RTE_MAX_ETHPORTS];
/* 网口running数量 */
uint8 link_port_num;
/* 运行状态 信号使用 */
static uint8 run_mode = true;

void clear_screen() {
    const char clr[] = { 27, '[', '2', 'J', '\0' };
    const char topLeft[] = { 27, '[', '1', ';', '1', 'H','\0' };

    /* Clear screen and move to top left */
    printf("%s%s", clr, topLeft);
}

void print_nic_statistics() {
    if (!run_mode) {
        return;
    }
    uint64 total_packets_dropped, total_packets_tx, total_packets_rx;
    total_packets_dropped = 0;
    total_packets_tx = 0;
    total_packets_rx = 0;

    printf("\nPort statistics ====================================");

    for (uint16 port_index = 0; port_index < link_port_num; port_index++) {
        if (link_port[port_index] == 0xff) {
            continue;
        }
        char eth_name[PAYLOAD_LEN] = {0};
        rte_eth_dev_get_name_by_port(link_port[port_index], eth_name);
        printf("\nStatistics for port %u ----%s------------------------"
               "\nPackets received: %lu"
               "\nPackets dropped: %lu",
               link_port[port_index],
               eth_name,
               port_statistics[link_port[port_index]].rx,
               port_statistics[link_port[port_index]].dropped);

        total_packets_dropped += port_statistics[link_port[port_index]].dropped;
        total_packets_rx += port_statistics[link_port[port_index]].rx;
    }
    printf("\nAggregate statistics ==============================="
           "\nTotal packets received: %llu"
           "\nTotal packets dropped: %llu",
           total_packets_rx,
           total_packets_dropped);
    printf("\n====================================================\n");
}

static void lcore_main_loop() {
    uint64 prev_tsc = 0;
    while (run_mode) {
        uint64 cur_tsc = rte_rdtsc();
        if ((cur_tsc - prev_tsc) > (uint64)10000000000) {
            clear_screen();
            print_nic_statistics();
            prev_tsc = cur_tsc;
        }
    }
}

static void do_pkt_on_dpi(struct rte_mbuf *mbuf) {
    //暂时不考虑带隧道的情况,也只考虑ipv4的报文
    uint8 *pkt = NULL;
    pkt = rte_pktmbuf_mtod(mbuf, uint8 *);
    if (!pkt) {
        return;
    }
    return;
}

static void lcore_rx_loop() {
    struct rte_mbuf *pkts_burst[MAX_PKT_BURST] = {NULL};
    struct rte_mbuf *m = NULL;
    uint16 nb_rx = 0;
    uint16 j = 0;
    while (run_mode) {
        for (uint16 port_index = 0; port_index < link_port_num; port_index++) {
            nb_rx = rte_eth_rx_burst(link_port[port_index], RX_QUEUE_ID, pkts_burst, MAX_PKT_BURST);
            port_statistics[link_port[port_index]].rx += nb_rx;
            for (j = 0; j < nb_rx; j++) {
                if (likely(j < nb_rx - 1)) {
                    rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[j + 1], void *));
                }
                m = pkts_burst[j];
                
                /* 现在已经收到网卡的报文可以解析了---业务开始 */
                if (m) {
                    do_pkt_on_dpi(m);
                }
                
                rte_pktmbuf_free(m);
            }
        }
    } 
}

static int every_lcore_loop(void *arg) {
    uint16 lcore_id = rte_lcore_id();
    /**
     *目前只有两个核心,0核作为控制核负责打印网口流量等统计信息  1核负责抓包解析
     *  
     **/
    if (lcore_id == rte_get_main_lcore()) {
        lcore_main_loop();
    }else {
        lcore_rx_loop();
    }
    return true;
}


uint8 find_eth_port() {
    if (!rte_eth_dev_count_avail()) {
        syslog(LOG_ERR, "No support eth device.");
        return false;
    }else {
        syslog(LOG_INFO, "Find eth device num: %u.\n", rte_eth_dev_count_avail());
        return true;
    }
}

static void signal_handler(int signum) {
    if (signum == SIGINT || signum == SIGTERM || signum == SIGTTIN || signum == SIGTTOU) {
        printf("\n\nSignal %d received, preparing to exit...\n", signum);
        syslog(LOG_INFO, "Signal %d received, preparing to exit...", signum);
        run_mode = false;
    }
}

void check_port_link_status() {
    if (!run_mode) {
        return;
    }

    syslog(LOG_INFO, "Check port link status.");

    uint8 portid = 0;
    uint8 index = 0;
    struct rte_eth_link link;
    RTE_ETH_FOREACH_DEV(portid) {
        if (portid > MAX_DEV_PORTID) {
            break;
        }
        memset(&link, 0, sizeof(struct rte_eth_link));
        rte_eth_link_get_nowait(portid, &link);
        if (link.link_status) {
            link_port_num++;
            link_port[index++] = portid;
            syslog(LOG_INFO, "Port %d Link Up. Speed %u Mbps - %s\n", portid, link.link_speed, (link.link_duplex == ETH_LINK_FULL_DUPLEX) ? ("full-duplex") : ("half-duplex\n"));   
        }else {
            syslog(LOG_INFO, "Port %d Link Down\n", portid);
        }
    }
}

void init_log_module() {
    openlog("app_dpdk", LOG_PID | LOG_CONS, LOG_USER);
    syslog(LOG_INFO, "The program app_dpdk is running.");
    return;
}

int main(int argc, char *argv[]) {
    /* 初始化平台 */
    int ret = rte_eal_init(argc, argv);
    if (ret < 0)
        rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");

    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    /* 注册日志模块 日志文件名/data/log/rsyslog/syslog.log */
    init_log_module();

    /* 检查当前网口数 */
    if (!find_eth_port()) {
        rte_exit(EXIT_FAILURE, "No support eth device.\n");
    }

    uint32 nb_mbufs = rte_eth_dev_count_avail() * (nb_rxd + nb_txd + MAX_PKT_BURST + rte_lcore_count() * MEMPOOL_CACHE_SIZE);
    rx_pktmbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", nb_mbufs, MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, SOCKET_ID_ANY);
    if (rx_pktmbuf_pool == NULL) {
        syslog(LOG_ERR, "Cannot init mbuf pool.");
        rte_exit(EXIT_FAILURE, "Cannot init mbuf pool\n");
    }

    /* 显示当前运行核数 */
    syslog(LOG_INFO, "The program app_dpdk running core nums: %u, main-lcore-id:%u", rte_lcore_count(), rte_get_main_lcore());

    /* 目前的想法是单核抓所有的网口 单核处理 流水线模型 */
    uint8 portid = 0;
    RTE_ETH_FOREACH_DEV(portid) {
        struct rte_eth_rxconf rxq_conf;
        struct rte_eth_txconf txq_conf;
        struct rte_eth_conf local_port_conf = port_conf;
        struct rte_eth_dev_info dev_info;

        if (portid > MAX_DEV_PORTID) {
            break;
        }

        /* init port */
        syslog(LOG_INFO, "Initializing port %u... ", portid);
        fflush(stdout);

        rte_eth_dev_info_get(portid, &dev_info);
        if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
            local_port_conf.txmode.offloads |=
                DEV_TX_OFFLOAD_MBUF_FAST_FREE;
        ret = rte_eth_dev_configure(portid, RX_QUEUE_NUM, TX_QUEUE_NUM, &local_port_conf);
        if (ret < 0) {
            syslog(LOG_ERR,  "Cannot configure device: err=%d, port=%u\n", ret, portid);
            rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u\n", ret, portid);
        }
            

        ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
        if (ret < 0) {
            syslog(LOG_ERR, "Cannot adjust number of descriptors: err=%d, port=%u\n", ret, portid);
            rte_exit(EXIT_FAILURE, "Cannot adjust number of descriptors: err=%d, port=%u\n", ret, portid);
        }

        /* init one RX queue */
        fflush(stdout);
        rxq_conf = dev_info.default_rxconf;
        rxq_conf.offloads = local_port_conf.rxmode.offloads;
        ret = rte_eth_rx_queue_setup(portid, RX_QUEUE_ID, nb_rxd,
                         SOCKET_ID_ANY,
                         &rxq_conf,
                         rx_pktmbuf_pool);
        if (ret < 0) {
            syslog(LOG_ERR, "rte_eth_rx_queue_setup:err=%d, port=%u\n", ret, portid);
            rte_exit(EXIT_FAILURE, "rte_eth_rx_queue_setup:err=%d, port=%u\n", ret, portid);
        }
            
        /* init one TX queue on each port */
        fflush(stdout);
        txq_conf = dev_info.default_txconf;
        txq_conf.offloads = local_port_conf.txmode.offloads;
        ret = rte_eth_tx_queue_setup(portid, RX_QUEUE_ID, nb_txd, SOCKET_ID_ANY, &txq_conf);
        if (ret < 0)
            rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u\n",
                ret, portid);

        /* Start device */
        ret = rte_eth_dev_start(portid);
        if (ret < 0)
            rte_exit(EXIT_FAILURE, "rte_eth_dev_start:err=%d, port=%u\n", ret, portid);

        syslog(LOG_INFO, "Init port: %u success.", portid);

        rte_eth_promiscuous_enable(portid);
        /* initialize port stats */
        memset(&port_statistics, 0, sizeof(port_statistics));
    }

    /* 睡眠一会等待网口running */
    sleep(3);

    /* 检查当前running的网口 后续只抓running的 */
    memset(link_port, 0xff, sizeof(link_port));
    check_port_link_status();

    for (uint8 i = 0; i < RTE_MAX_ETHPORTS; i++) {
        if (link_port[i] != 0xff) {
            syslog(LOG_INFO, "Running port, portid: %u.", link_port[i]);
        }
    }

    syslog(LOG_INFO, "The program app_dpdk is running......");

    /* 主函数回调 */
    rte_eal_mp_remote_launch(every_lcore_loop, NULL, CALL_MAIN);

    uint8 lcore_id = 0;
    RTE_LCORE_FOREACH_WORKER(lcore_id) {
        if (rte_eal_wait_lcore(lcore_id) < 0) {
            break;
        }
    }

    RTE_ETH_FOREACH_DEV(portid) {
        syslog(LOG_INFO, "Closing port %u...", portid);
        rte_eth_dev_stop(portid);
        rte_eth_dev_close(portid);
    }
    rte_eal_cleanup();
    syslog(LOG_INFO, "The program app_dpdk is stopped.");
    closelog();

    return 0;
}

        代码中控制主核负责打印网口流量统计信息,其余核心负责接收和处理网卡队列上的报文。目前示例程序为每一个网卡只设置了一个接收队列,因此在抓包时,只需要轮询抓一个队列上的报文即可,实际上一般不止一个队列。

3.1.关键的接口

具体用法也可以参考dpdk提供的api手册,链接如下:

DPDK: APIicon-default.png?t=O83Ahttp://doc.dpdk.org/api/

3.1.1.rte_eth_dev_count_avail()

        用于获取当前设备被dpdk驱动接管的网口数量,很明显网口数量小于1则无法抓包。

3.1.2.rte_pktmbuf_pool_create()

        主要用于创建报文缓存池,在网络数据包处理应用中,报文缓存池用于存储和管理接收到的报文。该函数主要涉及 网卡与 CPU 内存之间的 DMA(Direct Memory Access)传输,从而提高数据处理效率。

        其申请节点数量计算方式为,抓包网口数量×(每一个网口队列所需要分配的文件描述符(mbuf指针) × 网口队列数量 + 每个核心的报文缓存大小)。

3.1.3.rte_eth_dev_configure()

        设置网口队列信息,包括设置网口的队列数量(最大支持的队列数取决于硬件特性),网口的rss哈希算法等。

3.1.4.rte_eth_dev_adjust_nb_rx_tx_desc()

        校验当前网口是否支持这么多队列数量,如果超出硬件最大支持的数量则报错。

3.1.5.rte_eth_rx_queue_setup()

        为网口设置每一个接收队列,有多少队列则需要调用多少次,主要是为每一个队列分配报文缓冲池内存。

3.1.6.rte_eth_tx_queue_setup()

        为网口设置每一个发送队列,有多少队列则需要调用多少次,主要是为每一个队列分配报文缓冲池内存。

3.1.7.rte_eth_dev_start()

        启动该网口,

3.1.8.rte_eth_promiscuous_enable()

        开启混杂模式。

3.1.9.rte_eth_rx_burst()

        将网卡队列中的接收报文,并填充到mbuf指针中。此时获取到报文之后就可以对报文进行处理了。

        很明显这是一个流水线模型,抓包之后立马处理,然后在处理下一个报文,每一个处理核都是同样的任务,实际上可以根据需要调整。

3.2.步骤总结

        设置报文缓存池---->设置网卡信息---->校验网卡队列数是否合法---->分配收发队列---->开启混杂模式---->启动网卡---->收包处理

3.3.运行结果

运行命令:./main~ -l 0,1 --main-lcore 0

4.总结

        通过上述例子,大家也可以对其进行改造从而实现自己的业务和功能。


http://www.kler.cn/a/541851.html

相关文章:

  • SQL Server安装流程
  • C++17 新特性解析
  • 数据集成实例分享:金蝶云星空对接旺店通实现库存管理自动化
  • Expo运行模拟器失败错误解决(xcrun simctl )
  • Axure原型图怎么通过链接共享
  • Kafka系列之:定位topic只能保存最新数据的原因
  • RESTful开发中对象的合理使用探究
  • 分布式服务框架 如何设计一个更合理的协议
  • 爬取彩票网站数据
  • rpx和px混用方案
  • 【2024最新Java面试宝典】—— SpringBoot面试题(44道含答案)_java spingboot 面试题
  • el-table多列勾选
  • Vue2生命周期面试题
  • Access数据库教案(Excel+VBA+Access数据库SQL Server编程)
  • (3/100)每日小游戏平台系列
  • Visual Studio 2022环境下Miracl Lib库报错“无法解析的外部命令”
  • 数字孪生平台 v5.2 发布
  • Vulnhub empire-lupinone靶机攻击实战(一)
  • 【Elasticsearch】Elasticsearch检索方式全解析:从基础到实战(一)
  • 系统开发:大文件下载报错问题
  • 【自然语言处理】TextRank 算法提取关键词、短语、句(Python源码实现)
  • 【算法-动态规划】、魔法卷轴: 两次清零机会整个数组最大累加和
  • 代发考试战报:2月5号最近考过的思科和华为考试战报
  • 请求响应-请求-日期参数json参数路径参数
  • 【SpringBoot苍穹外卖】debugDay02
  • SpringBoot中为什么要引入消息队列依赖项