基于PCIE接口的高速大容量数据采集——软件篇

 2022-5-7     作者:Emtronix         

  高速大容量数据采集方案是一个可实现是连续采集数据率超过10MB/s的应用方案,方案的硬件由支持PCIE高速接口的Linux工控主板(ESM7100、ESM8100等)和基于FPGA的扩展模块组成,软件则包括符合Linux DMA Engine架构的DMA Controller驱动、DMA Client驱动及应用程序三个部分。方案的总体介绍请参考《基于PCIE接口的高速大容量数据采集—总体方案》一文。本文将重点介绍针对Xilinx的< DMA/Bridge Subsystem for PCI Express v4.1> FPGA IP核的特性,实现Linux DMA Engine架构驱动程序,并为应用程序提供简洁方便的API函数,来控制数据采集过程并适时处理已在系统内存中的实时采集数据流。


  目前Xilinx公司为其IP核DMA/Bridge Subsystem for PCI Express v4.1,仅提供基于x86体系的驱动,而没有在Linux DMA Engine架构上做工作。而事实上,DMA Engine架构已成为ARM嵌入式Linux平台的DMA应用的事实标准(de facto),因此本方案针对高速大容量数据采集的需求,构建了DMA Engine架构的驱动程序,包括通用DMA Controller驱动和面向应用的DMA Client驱动,应用程序通过标准的字符型设备节点,操作DMA Client驱动,从而实现所需的数据采集。图1是从软件开发角度来看的总体功能框图。


基于PCIE接口的高速大容量数据采集.png

图1 方案总体功能框图


  Linux DMA Engine架构的基本思路是把不同的DMA控制器封装为统一的API接口,供面向应用的DMA Client调用。在本方案中,DMA Client驱动一方面通过DMA Engine的API函数操作PCIE端点的DMA功能,另一方面还要通过PCIE接口控制前端的数据采集逻辑。DMA Client驱动对上层的用户应用程序,开放了一组标准的字符设备(char dev)API,供User App操作,在本方案中字符设备为“/dev/pcie0”。为了减少数据的搬动,采集数据缓冲区通常以ping-pong buffer的结构配置在Linux的保留存储器(reserved memory)中,且用户应用程序可通过mmap获得ping-pong buffer指针,直接处理DMA传上来的数据。Ping-pong buffer让应用程序和DMA交替操作数据buffer,如应用程序处理buffer 0#的数据时,DMA把新数据传送到buffer 1#,然后按固定时间周期ping-pong切换buffer,从而实现连续的实时数据采集和处理。字符设备“/dev/pcie0”仅传递采集硬件的控制与状态信息。


DMA Engine API


  DMA Engine架构为不同的DMA模式提供不同的API函数,其中最主要的是单次DMA和周期DMA两种,其API函数分别为:


struct dma_async_tx_descriptor *dmaengine_prep_slave_sg(
           struct dma_chan *chan, struct scatterlist *sgl,
           unsigned int sg_len, enum dma_data_direction direction,
           unsigned long flags);
 
struct dma_async_tx_descriptor *dmaengine_prep_dma_cyclic(
           struct dma_chan *chan, dma_addr_t buf_addr, size_t buf_len,
           size_t period_len, enum dma_data_direction direction);


  DMA Controller驱动要求DMA支持Scatter-gather结构的非连续数据Buffer,但在本方案的应用中,对单次DMA情形,采用单个Buffer是最常见的应用方式,这时可采用DMAEngine的简化函数:


struct dma_async_tx_descriptor *dmaengine_prep_slave_singl(
           struct dma_chan *chan, dma_addr_t buf, size_t len, 
           enum dma_data_direction direction, unsigned long flags);


  Cyclic DMA模式,是把多个DMA Buffer通过其描述符(dma descriptor)表连接成环状,当一个buffer的DMA传送结束后,驱动程序的中断线程将自动启动面向下一个描述符的DMA Buffer。由DMA descriptor表描述的逻辑流程如图2所示:


基于PCIE接口的高速大容量数据采集.png

图2 Cyclic DMA逻辑流程


  本方案的DMA Controller驱动实现了上述两种DMA传输方式,即单次DMA传输和周期DMA传输。DMA Controller驱动本质上讲,是一种通用的DMA服务器,如何使用DMA的传输功能,实现具体的数据传输任务,则是由DMA Client来决定的。Linux把DMA服务与具体应用分成两个部分,有利于DMA Controller驱动面向不同的应用场景。


DMA Client驱动


  DMA Client驱动是一个面向应用的驱动,如图1所示,它需要与User Space的上层应用程序配合运行,来完成所需的数据采集与处理。


  单次DMA的操作如下所示。


/* prepare a single buffer dma */
desc = dmaengine_prep_slave_single(dchan, edev->dma_phys,
              edev->total_len, DMA_DEV_TO_MEM, 
              DMA_PREP_INTERRUPT | DMA_CTRL_ACK);
if (!desc) {
    dev_err(edev->dev, "dmaengine_prep_slave_single(..) failed\n");
    ret = -ENODEV;
    goto error_out;
}
 
/* setup dtaker hardware */
eta750_dtaker_setup(edev);
 
/* put callback, and submit dma */
desc->callback = dma_callback;
desc->callback_param = edev;
edev->cookie = dmaengine_submit(desc);
ret = dma_submit_error(edev->cookie);
if (ret) {
dev_err(edev->dev, "DMA submit failed %d\n", ret);
goto error_submit;
}
 
/* init complete, and fire */
reinit_completion(&edev->xdma_chan_complete);
dma_async_issue_pending(dchan);
 
/* simulate input data */
eta750_dtaker_run(edev);
 
/* wait dma complete */
count = wait_for_completion_timeout(&edev->xdma_chan_complete, msecs_to_jiffies(DMA_TIMEOUT));
if (count == 0) {
dev_err(edev->dev, "wait_for_completion_timeout timeout\n");
ret = -ETIMEDOUT;
eta750_dtaker_end(edev);
goto error_submit;
}
 
/* error processing */
eta750_dtaker_error_pro(edev);
 
/* stop front-end daq unit */
count = eta750_dtaker_end(edev);
 
/* dump data */
eta750_dtaker_dump_data(edev);
return edev->total_len;
 
error_submit:
dmaengine_terminate_all(dchan);
 
error_out: 
return ret;


  只有周期DMA方式才能实现连续数据采集,在DMA Client中采用双DMA Buffer的乒乓结构来实现连续采集,应用程序处理0# Buffer数据时,DMA传输数据至1# Buffer,传输结束时,进行切换,应用程序处理1# Buffer数据,DMA传输新数据至0# Buffer。周期DMA需要指定每个buffer的长度period_len,同时需指定由2个buffer构成的ping-pong buffer的总长度total_len。其DMA流程如下所示。


/* prepare cyclic buffer dma */
desc = dmaengine_prep_dma_cyclic(dchan, edev->dma_phys, edev->total_len, 
edev->period_len, DMA_DEV_TO_MEM, DMA_PREP_INTERRUPT);
if (!desc) {
dev_err(edev->dev, "%s: prep dma cyclic failed!\n", __func__);
ret = -EINVAL;
goto error_out;
}
 
/* in cyclic mode */
edev->cyclic = true;
 
/* setup dtaker hardware */
eta750_dtaker_setup(edev);
 
/* put callback, and submit dma */
desc->callback = dma_callback;
desc->callback_param = edev;
edev->cookie = dmaengine_submit(desc);
ret = dma_submit_error(edev->cookie);
if (ret) {
dev_err(edev->dev, "cyclic dma submit failed %d\n", ret);
goto error_submit;
}
 
/* init complete, and fire */
reinit_completion(&edev->xdma_chan_complete);
dma_async_issue_pending(dchan);
edev->running = true; 
 
/* simulate input data */
eta750_dtaker_run(edev);
edev->data_seed += ((edev->period_len / sizeof(u16)) * edev->data_incr);
 
while(!kthread_should_stop()) {
/* wait dma complete */
count = wait_for_completion_timeout(&edev->xdma_chan_complete, msecs_to_jiffies(DMA_TIMEOUT));
if (count == 0) {
dev_err(edev->dev, "wait_for_completion timeout, transfer %d\n",
edev->transfer_count);
ret = -ETIMEDOUT;
break;
}
 
/* data processing */
eta750_dtaker_error_pro(edev);
edev->transfer_count++;
reinit_completion(&edev->xdma_chan_complete);
 
/* fill more data */
eta750_dtaker_run(edev);
edev->data_seed += ((edev->period_len / sizeof(u16)) * edev->data_incr);
}
 
/* stop front-end daq unit */
count = eta750_dtaker_end(edev);
edev->running = false; 
 
error_submit:
dmaengine_terminate_all(dchan);
edev->cyclic = false;
dev_info(edev->dev, "%s: dma stopped, cyclic %d, running %d\n", __func__,
edev->cyclic, edev->running);
 
error_out:
return ret;


  从上面代码可见,传送过程是一个无限循环,DMA Controller驱动会自动进行ping-pong buffer的切换。并通过回调函数通知上层应用程序,新数据已准备就绪。应用程序可通过命令来终止采集传输过程。


DTaker前端采集逻辑


  前端数据采集单元DTaker,包括在FPGA芯片XC7A50T-2CSG325中,主要是实现对AD芯片AD7606C的操作,并把数据写入FPGA中的FIFO缓冲区中。从图3可看到作为User Logic的DTaker与FIFO的相互关系。


基于PCIE接口的高速大容量数据采集.png

图3 DTaker功能框图


  从programming角度看,DTaker就是一组寄存器。代码通过操作实现:(1)启动AD转换;(2)读取AD数据,并按照一定格式把packed的数据写入FIFO。具体说,DTaker是由8个32-bit寄存器及相关数字逻辑构成的可编程控制单元,寄存器的定义如下。


Offset寄存器名称功能简述
0x00CONTROL控制寄存器
0x04STATUS状态寄存器
0x08PKG_SIZE采集数据长度寄存器
0x0CDAQ_CFG前端采集控制寄存器
0x10EVENT_CFG前端采集触发事件配置寄存器
0x14-未定义
0x18CPU_DAT2CPU仿真测试数据
0x1CCPU_DAT3CPU仿真测试数据


  各个寄存器功能定义如下:


  DTaker控制寄存器 CONTROL各bit定义如下:

  D[3 : 0] = LEDOUT[3 : 0]

  D4 = EVENT_IRQEN,置1使能DTAKER触发中断,事件=某种触发条件

  D5 = OVERFLOW_IRQEN,置1使能FIFO写溢出中断

  D6 = TIMEOUT_IRQEN,置1使能C2H接口超时中断,暂不用

  D7 = DMAEN,置1使能AXIS transfer

  D8 = DAQEN,= 0:选择CPU仿真数据;= 1: 选择前端AD数据

  D9 = FIFO_RESET,置1清FIFO,软件清零控制位

  D[31 : 10] = 暂时未用


  状态寄存器STATUS,各bit状态标志RO/W1C

  D[1 : 0] = KEY_IN[1:0]

  D2 = EVENT_FLAG,事件标志,W1C

  D3 = OVERFLOW_ERR,FIFO写溢出错误标志,W1C

  D4 = EVENT_IRQ,事件中断标志,W1C

  D5 = OVERFLOW_IRQ,FIFO写溢出错误中断标志,W1C

  D6 = TIMEOUT_IRQ,C2H接口超时中断标志,W1C

  D7 = DEBUG_BIT,调试用,根据情况临时定义具体内容

  D[17 : 8] = 10’b0000000000,暂时未用

  D18 = FIFO_FULL,=1: FIFO 已满

  D19 = FIFO_EMPTY,=1: FIFO 已空

  D[22 : 20] = EVENT_CODE[2 : 0],事件编码信息

  D23 = 1’b0,暂时未用

  D[31 : 24] = Version Info, RO


  DMA传送长度寄存器PKG_SIZE,字节为单位,需16字节整倍数。每完成一次transfer,PKG_SIZE -= 8字节,直至PKG_SIZE = 0;PKG_SIZE = 8时,表示后续是最后一次transfer,TLAST = 1。


  数据采集配置寄存器DAQ_CFG,用于定义采集的通道、采样率等参数。

  DAQ_CFG.D0 = RUN,置1启动前端硬件数据采集。


  触发事件配置寄存器EVENT_CFG,用于定义采集单元的触发事件条件,如触发通道、电平、触发沿,触发采集长度等参数。

  

  仿真数据寄存器CPU_DAT2。


  仿真数据寄存器CPU_DAT3,与CPU_DAT2构成一次64-bit的raw_data。由fifo_writer状态机写入XPM_FIFO_ASYNC。


应用程序


  从应用程序的角度看,程序的基本架构与《精简ISA总线实现高速大容量数据采集》一文的描述是一样的,其要点包括:

  1、使用预先划定的Linux保留存储区域作为采集数据buffer,应用程序通过mmap获得buffer指针。

  2、打开设备文件(节点名称:“/dev/pcie0”),通过write函数设置启动数据采集过程,在接收线程通过poll函数等待驱动通知数据就绪信息。

  3、Write函数写入pcie_dma_info数据结构,定义如下:


struct pcie_dma_info {
    dma_addr_t phys;        /* dma buffer start addr, */
                             /* = 0: internal buffer for testing */
    unsigned long size;     /* total buffer size in byte */
    int period;             /* = 1: single dma, > 1: cyclic dma */
    int count;              /* only for cyclic dma, = 0: free run, */
                            /* > 0: number of period buffer after */
                             /* event happened */
    u32  daq_config;        /* data acquisition config register in */
                             /* dtaker IP */
    u32  event_config;      /* event config register in dtaker IP */
};


  4、由数据就绪信息触发对采集数据buffer的相应处理。


  对本方案有实际应用需求的客户,可与英创公司技术支持联系,以了解进一步的技术细节,进行实际的评估应用。