Agent Skills: rust-dpdk

用户态网络专家。处理 DPDK, 用户态驱动, 高性能网络, packet processing, 零拷贝, RSS 负载均衡--- # 用户态网络 (DPDK) ## 核心问题 **如何实现百万级 PPS 的高性能网络数据包处理?** 传统内核网络栈有太多上下文切换和内存拷贝开销。

UncategorizedID: huiali/rust-skills/rust-dpdk

Install this agent skill to your local

pnpm dlx add-skill https://github.com/huiali/rust-skills/tree/HEAD/.codex/skills/rust-dpdk

Skill Files

Browse the full folder contents for rust-dpdk.

Download Skill

Loading file tree…

.codex/skills/rust-dpdk/SKILL.md

Skill Metadata

Name
rust-dpdk
Description
用户态网络专家。处理 DPDK, 用户态驱动, 高性能网络, packet processing, 零拷贝, RSS 负载均衡--- # 用户态网络 (DPDK) ## 核心问题 **如何实现百万级 PPS 的高性能网络数据包处理?** 传统内核网络栈有太多上下文切换和内存拷贝开销。

DPDK vs 内核网络栈

| 特性 | 内核网络栈 | DPDK | |-----|----------|------| | 上下文切换 | 每次包都切换 | 轮询模式,无切换 | | 内存拷贝 | 多次拷贝 | 零拷贝 | | 中断 | 频繁中断 | 轮询 (poll mode driver) | | 延迟 | 较高 | 微秒级 | | 吞吐量 | 万级 PPS | 百万级 PPS | | CPU 利用率 | 较低但有开销 | 高但高效 |

核心组件

// DPDK 核心结构
struct DpdkContext {
    memory_pool: Mempool,        // 内存池
    ports: Vec<Port>,            // 网卡端口
    rx_queues: Vec<RxQueue>,     // 接收队列
    tx_queues: Vec<TxQueue>,     // 发送队列
    cpu_cores: Vec<Core>,        // CPU 核心分配
}

struct Port {
    port_id: u16,
    mac_addr: [u8; 6],
    link_speed: u32,
    max_queues: u16,
}

struct Mempool {
    name: String,
    buffer_size: usize,
    cache_size: usize,
    total_buffers: u32,
}

内存池管理

// 创建 DPDK 内存池
fn create_mempool() -> Result<Mempool, DpdkError> {
    let mempool = unsafe {
        rte_mempool_create(
            b"packet_pool\0".as_ptr() as *const c_char,
            NUM_BUFFERS as u32,        // 缓冲区数量
            BUFFER_SIZE as u16,        // 每个缓冲区大小
            CACHE_SIZE as u32,         // CPU 缓存大小
            0,                         // 私有数据大小
            Some(rte_pktmbuf_pool_init), // 初始化函数
            std::ptr::null(),          // 初始化参数
            Some(rte_pktmbuf_init),    // 对象初始化函数
            std::ptr::null(),          // 对象参数
            rte_socket_id() as i32,    // 内存所在 Socket
            0,                         // 标志位
        )
    };
    
    if mempool.is_null() {
        Err(DpdkError::MempoolCreateFailed)
    } else {
        Ok(Mempool { inner: mempool })
    }
}

// 分配缓冲区
fn alloc_mbuf(mempool: &Mempool) -> Option<*mut rte_mbuf> {
    unsafe {
        let mbuf = rte_pktmbuf_alloc(mempool.inner);
        if mbuf.is_null() {
            None
        } else {
            Some(mbuf)
        }
    }
}

零拷贝接收

// 零拷贝接收数据包
fn process_packets(
    port_id: u16,
    queue_id: u16,
    bufs: &mut [*mut rte_mbuf; MAX_BURST_SIZE],
) -> usize {
    let num_received = unsafe {
        rte_eth_rx_burst(
            port_id,
            queue_id,
            bufs.as_mut_ptr(),
            MAX_BURST_SIZE as u16,
        )
    };
    
    // 直接处理 mbuf,不需要拷贝
    for i in 0..num_received {
        let mbuf = bufs[i];
        
        // 访问数据(零拷贝)
        let data_ptr = unsafe {
            rte_pktmbuf_mtod(mbuf, *const u8)
        };
        let data_len = unsafe {
            rte_pktmbuf_pkt_len(mbuf)
        };
        
        // 处理数据包
        process_packet(data_ptr, data_len);
        
        // 释放 mbuf 回内存池
        unsafe {
            rte_pktmbuf_free(mbuf);
        }
    }
    
    num_received
}

批量发送

// 批量发送数据包
fn transmit_packets(
    port_id: u16,
    queue_id: u16,
    packets: &[Packet],
) -> usize {
    let mut mbufs: Vec<*mut rte_mbuf> = packets
        .iter()
        .map(|p| p.to_mbuf())
        .collect();
    
    let sent = unsafe {
        rte_eth_tx_burst(
            port_id,
            queue_id,
            mbufs.as_mut_ptr(),
            mbufs.len() as u16,
        )
    };
    
    // 释放未发送的 mbuf
    for i in sent..mbufs.len() {
        unsafe {
            rte_pktmbuf_free(mbufs[i]);
        }
    }
    
    sent
}

RSS 负载均衡

// 配置 RSS (Receive Side Scaling)
fn configure_rss(port_id: u16) -> Result<(), DpdkError> {
    let mut port_info: rte_eth_dev_info = unsafe { std::mem::zeroed() };
    unsafe {
        rte_eth_dev_info_get(port_id, &mut port_info);
    }
    
    // 配置 RSS 哈希
    let mut rss_conf: rte_eth_rss_conf = unsafe { std::mem::zeroed() };
    rss_conf.rss_key_len = 40;
    rss_conf.rss_hf = RTE_ETH_RSS_TCP | RTE_ETH_RSS_UDP | RTE_ETH_RSS_IPV4;
    
    unsafe {
        let ret = rte_eth_dev_rss_hash_conf_update(
            port_id,
            &rss_conf,
        );
        if ret < 0 {
            return Err(DpdkError::RssConfigFailed);
        }
    }
    
    Ok(())
}

// 根据哈希值分配队列
fn get_queue_by_hash(hash: u32, num_queues: u16) -> u16 {
    // 使用简单的取模分发
    (hash % num_queues as u32) as u16
}

多队列配置

// 配置多队列
fn configure_multi_queue(port_id: u16, num_queues: u16) -> Result<(), DpdkError> {
    let mut port_conf: rte_eth_conf = unsafe { std::mem::zeroed() };
    port_conf.rxmode.split_hdr_size = 0;
    port_conf.rxmode.mq_mode = rte_eth_mq_mode::ETH_MQ_RX_RSS;
    port_conf.txmode.mq_mode = rte_eth_mq_mode::ETH_MQ_TX_NONE;
    
    // 配置 RX 队列
    let mut rx_conf: rte_eth_rxconf = unsafe { std::mem::zeroed() };
    rx_conf.rx_free_thresh = 32;
    rx_conf.rx_drop_en = 0;
    
    // 配置 TX 队列
    let mut tx_conf: rte_eth_txconf = unsafe { std::mem::zeroed() };
    tx_conf.tx_free_thresh = 32;
    
    // 分配 RX 队列
    for queue in 0..num_queues {
        unsafe {
            let ret = rte_eth_rx_queue_setup(
                port_id,
                queue,
                1024, // 队列深度
                rte_socket_id() as u32,
                &rx_conf,
                mempool.inner,
            );
            if ret < 0 {
                return Err(DpdkError::QueueSetupFailed);
            }
        }
    }
    
    // 分配 TX 队列
    for queue in 0..num_queues {
        unsafe {
            let ret = rte_eth_tx_queue_setup(
                port_id,
                queue,
                1024,
                rte_socket_id() as u32,
                &tx_conf,
            );
            if ret < 0 {
                return Err(DpdkError::QueueSetupFailed);
            }
        }
    }
    
    Ok(())
}

CPU 亲和性

use std::os::raw::c_int;
use std::thread;

fn set_cpu_affinity(core_id: u32) -> Result<(), DpdkError> {
    let mut cpuset: cpu_set_t = unsafe { std::mem::zeroed() };
    
    unsafe {
        CPU_SET(core_id as usize, &mut cpuset);
        
        let ret = pthread_setaffinity_np(
            pthread_self(),
            std::mem::size_of::<cpu_set_t>(),
            &cpuset,
        );
        
        if ret != 0 {
            return Err(DpdkError::AffinitySetFailed);
        }
    }
    
    Ok(())
}

// 为每个 RX 队列分配专用核心
fn allocate_cores_for_queues(num_queues: u16) {
    for queue in 0..num_queues {
        thread::spawn(move || {
            set_cpu_affinity(queue as u32).unwrap();
            process_queue(queue);
        });
    }
}

性能优化

| 优化点 | 方法 | |-------|------| | 内存对齐 | 缓存行对齐 (64 字节) | | 无锁队列 | 使用 SPSC 队列 | | 批处理 | 批量收发减少系统调用 | | CPU 亲和性 | 核心绑定减少上下文切换 | | Hugepages | 2MB/1GB 大页减少 TLB miss |

与其他技能关联

rust-dpdk
    │
    ├─► rust-performance → 性能优化
    ├─► rust-embedded → no_std 环境
    └─► rust-concurrency → 并发模型