KV260 进阶开发(PYNQ驱动开发+Pixel Pack)
目录
1. 简介
2. PixelPacker HLS 实现
2.1 PixelPacker HLS 源码
2.2 PixelPacker 功能简介
2.3 头文件介绍
2.4 启动间隔 II
2.5 Case V24 片段解释
3. PixelPacker Py 驱动
3.1 PixelPacker Py 源码
3.2 PixelPacker 类详解
3.3 property 装饰器
3.4 操作寄存器
3.5 DefaultIP 类源码
3.6 DefaultIP 类详解
4. Takeaways
1. 简介
- 本文讨论在 PYNQ 框架下,使用 Python 驱动 Vitis HLS Kernel。
- 使用 Registermap 也可以完成驱动,即直接操作所有寄存器。
- 通过 Python 方式编写的驱动更直观,可以理解为对 Registermap 的封装。
- Registermap 和 Python 驱动,底层都是调用 MMIO 的读写。
2. PixelPacker HLS 实现
2.1 PixelPacker HLS 源码
Pixel Packerhttps://github.com/Xilinx/PYNQ/tree/master/boards/ip/hls/pixel_pack
#include <ap_fixed.h>
#include <ap_int.h>
#include "hls_stream.h"
#include <ap_axi_sdata.h>
typedef ap_axiu<24,1,0,0> narrow_pixel;
typedef ap_axiu<32,1,0,0> wide_pixel;
typedef hls::stream<narrow_pixel> narrow_stream;
typedef hls::stream<wide_pixel> wide_stream;
#define V_24 0
#define V_32 1
#define V_8 2
#define V_16 3
#define V_16C 4
void pixel_pack(narrow_stream& stream_in_24 ,
wide_stream& stream_out_32,
int mode,
ap_uint<8> alpha) {
#pragma HLS INTERFACE mode=axis register_mode=both depth=24 port=stream_in_24 register
#pragma HLS INTERFACE mode=axis register_mode=both depth=24 port=stream_out_32 register
#pragma HLS INTERFACE mode=s_axilite port=mode register
#pragma HLS INTERFACE mode=s_axilite port=alpha register
#pragma HLS INTERFACE mode=ap_ctrl_none port=return
bool last = false;
bool delayed_last = false;
narrow_pixel in_pixel;
wide_pixel out_pixel;
switch (mode) {
case V_24:
while (!delayed_last) {
#pragma HLS pipeline II=4
delayed_last = last;
ap_uint<96> buffer;
ap_uint<4> has_last;
ap_uint<4> has_user;
for (int j = 0; j < 4; ++j) {
if (!last) {
stream_in_24.read(in_pixel);
buffer(j*24 + 23, j*24) = in_pixel.data;
has_user[j] = in_pixel.user;
has_last[j] = in_pixel.last;
last = in_pixel.last;
}
}
if (!delayed_last) {
for (int i = 0; i < 3; ++i) {
out_pixel.data = buffer(i*32 + 31, i*32);
out_pixel.user = has_user[i];
out_pixel.last = has_last[i+1];
stream_out_32.write(out_pixel);
}
}
}
break;
case V_32:
while (!last) {
#pragma HLS pipeline II=1
ap_uint<32> data;
stream_in_24.read(in_pixel);
data(23, 0) = in_pixel.data;
data(31, 24) = alpha;
out_pixel.data = data;
out_pixel.last = in_pixel.last;
out_pixel.user = in_pixel.user;
last = in_pixel.last;
stream_out_32.write(out_pixel);
}
break;
case V_8:
while (!delayed_last) {
#pragma HLS pipeline II=4
delayed_last = last;
bool user = false;
ap_uint<32> data;
for (int i = 0; i < 4; ++i) {
if (!last) {
stream_in_24.read(in_pixel);
user |= in_pixel.user;
last = in_pixel.last;
data(i*8 + 7, i * 8) = in_pixel.data(7,0);
}
}
if (!delayed_last) {
out_pixel.user = user;
out_pixel.last = last;
out_pixel.data = data;
stream_out_32.write(out_pixel);
}
}
break;
case V_16:
while (!last) {
#pragma HLS pipeline II=2
bool user = false;
ap_uint<32> data;
for (int i = 0; i < 2; ++i) {
stream_in_24.read(in_pixel);
user |= in_pixel.user;
last = in_pixel.last;
data(i*16 + 15, i*16) = in_pixel.data(16,0);
}
out_pixel.user = user;
out_pixel.last = last;
out_pixel.data = data;
stream_out_32.write(out_pixel);
}
break;
case V_16C:
while (!last) {
#pragma HLS pipeline II=2
bool user = false;
ap_uint<48> data;
for (int i = 0; i < 2; ++i) {
stream_in_24.read(in_pixel);
user |= in_pixel.user;
last = in_pixel.last;
data(i*24 + 23, i*24) = in_pixel.data;
}
ap_uint<32> out_data;
ap_uint<9> out_c1 = ap_uint<9>(data(15,8)) + ap_uint<9>(data(39,32));
ap_uint<9> out_c2 = ap_uint<9>(data(23,16)) + ap_uint<9>(data(47,40));
out_data(7,0) = data(7,0);
out_data(15,8) = out_c1(8,1);
out_data(23,16) = data(31,24);
out_data(31,24) = out_c2(8,1);
out_pixel.user = user;
out_pixel.last = last;
out_pixel.data = out_data;
stream_out_32.write(out_pixel);
}
break;
}
}
2.2 PixelPacker 功能简介
pixel_pack kernel,从一个输入流 stream_in_24 读取24位像素数据,并根据给定的模式 mode 将这些数据转换和打包成32位的数据,然后将转换后的数据写入到输出流 stream_out_32 中。
Kernel 中定义了五种模式:V_24, V_32, V_8, V_16, V_16C,每种模式都有其特定的处理逻辑。
1). V_24: 这个模式将读取4个24位的像素,将它们合并成一个96位的缓冲区,然后从这个缓冲区中提取出3个32位的像素并写入到输出流中。
2). V_32: 在这个模式中,每个24位的输入像素被扩展到32位,通过在最高的8位加上一个 alpha 值(透明度信息)。然后将这32位的数据写入到输出流中。
3). V_8: 此模式将4个24位的输入像素的最低8位提取出来,并将这些8位数据打包成一个32位的像素。
4). V_16: 在这个模式下,每两个24位的输入像素被组合成一个32位的输出像素,只包含输入像素的最低16位。
5). V_16C: 此模式处理两个24位的输入像素,并对其中的某些通道进行加法运算,最终生成一个32位的输出像素。它将第一个和第二个输入像素的第二个通道和第三个通道的值分别相加,然后将结果的高8位作为输出像素的相应通道值。
2.3 头文件介绍
1). #include <ap_fixed.h> 和 #include <ap_int.h>:
- 这两个头文件提供了定点数和整数的数据类型。
- ap_int.h 定义了 ap_int 和 ap_uint 数据类型,这些类型用于表示固定宽度的有符号和无符号整数。
- ap_fixed.h 提供了定点数的实现,设计者可以指定整数部分和小数部分的位宽。
2). #include "hls_stream.h":
- 这个头文件包含了 hls::stream 类的定义,用于在 HLS 设计中创建和管理流接口。
- stream 一种 FIFO(先进先出)缓冲区,适用于数据流处理。
3). #include <ap_axi_sdata.h>:
- 这个头文件定义了与 AXI4-Stream 协议兼容的数据结构。
- 其中,ap_axiu 是一种带有用户信号(user)和最后信号(last)的 AXI4-Stream 数据类型。
2.4 启动间隔 II
源码中,#pragma HLS pipeline 有三种不同的 II(Initiation Interval)值:II=1、II=2 和 II=4。这些值决定了流水线的启动间隔,即每个循环迭代之间的时钟周期数。不同的 II 值用于优化不同的处理模式。以下是每种模式的解释:
- II=1:
这种模式下,流水线每个时钟周期启动一次新的迭代。它用于 V_32 模式,因为每次处理一个像素,且没有复杂的操作或数据依赖。
- II=2:
这种模式下,流水线每两个时钟周期启动一次新的迭代。它用于 V_16 和 V_16C 模式,因为每次处理两个像素,且需要一些额外的计算(如 V_16C 模式下的颜色通道计算)。
- II=4:
这种模式下,流水线每四个时钟周期启动一次新的迭代。它用于 V_24 和 V_8 模式,因为每次处理四个像素,且需要更多的时间来读取和处理数据。
不同的 II 值是为了在不同的处理模式下达到最佳的性能和资源利用率。
2.5 Case V24 片段解释
case V_24:
while (!delayed_last)
{
#pragma HLS pipeline II=4
ap_uint<96> buffer;
ap_uint<4> tmp_last;
ap_uint<4> tmp_user;
delayed_last = last;
for (int j = 0; j < 4; ++j) {
if (!last) {
stream_in_24.read(in_pixel);
buffer(j*24 + 23, j*24) = in_pixel.data;
tmp_user[j] = in_pixel.user;
tmp_last[j] = in_pixel.last;
last = in_pixel.last;
}
}
if (!delayed_last) {
for (int i = 0; i < 3; ++i) {
out_pixel.data = buffer(i*32 + 31, i*32);
out_pixel.user = tmp_user[i];
out_pixel.last = tmp_last[i+1];
stream_out_32.write(out_pixel);
}
}
}
break;
主循环
- while (!delayed_last):这个循环持续运行,直到 delayed_last 为 true。delayed_last 是一个延迟的 last 信号,用于确保在处理完所有数据之前不会退出循环。
- #pragma HLS pipeline II=4:这个指令告诉 HLS 工具将这个循环进行流水线化,启动间隔为 4 个时钟周期。
变量定义
- delayed_last:延迟一拍的 last 信号,用于控制循环退出。
- buffer:一个 96 位的缓冲区,用于存储 4 个 24 位的像素数据。
- tmp_last 和 tmp_user:分别用于存储 last 和 user 信号。
内部循环
- for (int j = 0; j < 4; ++j):这个循环读取 4 个 24 位的像素数据,并将它们存储在 buffer 中。
- stream_in_24.read(in_pixel):从输入流中读取一个像素。
- buffer(j*24 + 23, j*24) = in_pixel.data:将读取的像素数据存储在 buffer 中。
- tmp_user[j] = in_pixel.user 和 tmp_last[j] = in_pixel.last:存储每个像素的 user 和 last 信号。
- last = in_pixel.last:更新 last 信号。
输出逻辑
- if (!delayed_last):如果 delayed_last 为 false,则处理输出。
- for (int i = 0; i < 3; ++i):这个循环将 buffer 中的 96 位数据分成 3 个 32 位的数据,并写入输出流。
- out_pixel.data = buffer(i*32 + 31, i*32):从 buffer 中提取 32 位数据。
- out_pixel.user = tmp_user[i] 和 out_pixel.last = tmp_last[i+1]:设置输出像素的 user 和 last 信号。
- stream_out_32.write(out_pixel):将输出像素写入输出流。
- for (int i = 0; i < 3; ++i):这个循环将 buffer 中的 96 位数据分成 3 个 32 位的数据,并写入输出流。
3. PixelPacker Py 驱动
3.1 PixelPacker Py 源码
文件位置:/usr/local/share/pynq-venv/lib/python3.10/site-packages/pynq/lib/video/pipeline.py
from pynq import DefaultIP
from .common import *
class PixelPacker(DefaultIP):
def __init__(self, description):
super().__init__(description)
self._bpp = 24
self.write(0x10, 0)
self._resample = False
bindto = [
"xilinx.com:hls:pixel_pack:1.0",
"xilinx.com:hls:pixel_unpack:1.0",
"xilinx.com:hls:pixel_pack_2:1.0",
"xilinx.com:hls:pixel_unpack_2:1.0",
]
@property
def bits_per_pixel(self):
mode = self.read(0x10)
if mode == 0:
return 24
elif mode == 1:
return 32
elif mode == 2:
return 8
elif mode <= 4:
return 16
@bits_per_pixel.setter
def bits_per_pixel(self, value):
if value == 24:
mode = 0
elif value == 32:
mode = 1
elif value == 8:
mode = 2
elif value == 16:
if self._resample:
mode = 4
else:
mode = 3
else:
raise ValueError("Bits per pixel must be 8, 16, 24 or 32")
self._bpp = value
self.write(0x10, mode)
@property
def resample(self):
return self._resample
@resample.setter
def resample(self, value):
self._resample = value
# Make sure the mode is updated
if self.bits_per_pixel == 16:
self.bits_per_pixel = 16
3.2 PixelPacker 类详解
PixelPacker 类是像素格式转换驱动程序。它继承自 DefaultIP 类。
该用于更改视频流中每像素的位数。在更改宽度之前,应暂停流。这可以针对像素打包或像素解包IP核心。对于打包器,输入始终为每像素24位;而对于解包器,输出则为每像素24位。
类的属性和方法:
1). 构造函数 (__init__)
- 初始化父类 DefaultIP。
- 设置初始的每像素位数为24位。
- 设置初始模式寄存器(位于0x10地址)为0。
- _resample 属性被设置为 False,这个属性用于决定在 16bpp 模式下的采样行为。
2). bindto 属性
- 一个包含硬件 IP 核标识的列表,表明这个类可以绑定到哪些特定的硬件实现上。
3). bits_per_pixel 属性
- 一个属性装饰器,提供对每像素位数(_bpp)的读取和设置功能。
- 读取功能通过读取模式寄存器(0x10)的值来确定当前的每像素位数。
- 设置功能允许设置每像素位数为8、16、24或32位,并根据这个值更新模式寄存器(0x10)。
mode _bpp pack unpack
-------------------------------------------------------
0 24 bpp 不改变像素格式 不改变像素格式
1 32 bpp 用0填充第四个通道 丢弃第四个通道
2 8 bpp 只保留第一个通道 用0填充其他通道
3/4 16 bpp 取决于重采样 取决于重采样
4). resample 属性
- 一个布尔类型的属性装饰器,指示是否在 16bpp 模式下执行色度重采样。
- 设置器在更改值之后会更新模式寄存器,以确保在 16bpp 模式下采样行为与设置的值一致。
3.3 property 装饰器
在 Python 中,@property 装饰器可以被用来将一个方法变成一个属性,允许你用属性的方式访问方法,同时保持方法的功能。
常用于实现对属性的访问控制,以及在访问属性时执行额外的逻辑,比如参数检查或者转换。
示例:
class Celsius:
def __init__(self, temperature=0):
self._temperature = temperature
@property
def temperature(self):
print("Getting temperature")
return self._temperature
@temperature.setter
def temperature(self, value):
if value < -273.15:
raise ValueError("Temperature below -273.15 is not possible")
print("Setting temperature to", value)
self._temperature = value
# 创建 Celsius 类的实例
temp = Celsius()
# 设置温度
temp.temperature = 30
# 获取温度
print(temp.temperature)
# 尝试设置不合理的温度值
try:
temp.temperature = -300
except ValueError as e:
print(e)
此例中:
- 定义了一个 Celsius 类,它有一个私有属性 _temperature。
- 使用 @property 装饰了一个名为 temperature 的方法,使之成为一个属性的 getter。
- 定义了 temperature 的 setter 方法,允许在设置温度值时加入额外的逻辑(例如检查温度是否低于物理极限)。
- 当尝试获取或设置 temperature 属性时,会自动调用对应的方法。
3.4 操作寄存器
在 HLS 中我们可以看到 mode 寄存器对应的地址是 0x10:
* S_AXILITE Registers
+---------------+----------+--------+-------+--------+----------------------+
| Interface | Register | Offset | Width | Access | Description |
+---------------+----------+--------+-------+--------+----------------------+
| s_axi_control | mode | 0x10 | 32 | W | Data signal of mode |
| s_axi_control | alpha | 0x18 | 32 | W | Data signal of alpha |
+---------------+----------+--------+-------+--------+----------------------+
在 bits_per_pixel 的 getter 和 setter 方法中,是通过读取或者写入 0x10 来实现的:
@property
def bits_per_pixel(self):
mode = self.read(0x10)
if mode == 0:
return 24
elif mode == 1:
return 32
elif mode == 2:
return 8
elif mode <= 4:
return 16
@bits_per_pixel.setter
def bits_per_pixel(self, value):
if value == 24:
mode = 0
elif value == 32:
mode = 1
elif value == 8:
mode = 2
elif value == 16:
if self._resample:
mode = 4
else:
mode = 3
else:
raise ValueError("Bits per pixel must be 8, 16, 24 or 32")
self._bpp = value
self.write(0x10, mode)
3.5 DefaultIP 类源码
DefaultIP 类是一个复杂的基础设施,支持多种高级功能,如硬件中断处理、内存映射I/O操作、寄存器映射和硬件加速器的调度执行。
它为开发针对特定硬件IP的定制驱动提供了框架和工具,使得硬件功能的集成和利用更加高效和系统化。
class DefaultIP(metaclass=RegisterIP):
def __init__(self, description):
if "device" in description:
self.device = description["device"]
else:
from .pl_server.device import Device
self.device = Device.active_device
self.mmio = MMIO(
description["phys_addr"], description["addr_range"], device=self.device
)
if "interrupts" in description:
self._interrupts = description["interrupts"]
else:
self._interrupts = {}
if "gpio" in description:
self._gpio = description["gpio"]
else:
self._gpio = {}
for interrupt, details in self._interrupts.items():
try:
setattr(self, interrupt, Interrupt(details["fullpath"]))
except ValueError as e:
warnings.warn("Interrupt {} not created: {}".format(interrupt, str(e)))
setattr(self, interrupt, None)
for gpio, entry in self._gpio.items():
gpio_number = GPIO.get_gpio_pin(entry["index"])
setattr(self, gpio, GPIO(gpio_number, "out"))
if "registers" in description:
self._registers = description["registers"]
self._fullpath = description["fullpath"]
self._register_name = description["fullpath"].rpartition("/")[2]
if "CTRL" in self._registers and self.device.has_capability("CALLABLE"):
(
self._signature,
struct_string,
self._ptr_list,
self.args,
) = _create_call(self._registers)
self._call_struct = struct.Struct(struct_string)
self._ctrl_reg = True
self.start_ert = self._start_ert
self.start_sw = self._start_sw
self.call = self._call
if self.device.has_capability("ERT"):
self.start = self._start_ert
else:
self.start = self._start_sw
else:
self._registers = None
if "index" in description:
cu_index = self.device.open_context(description)
self.cu_mask = 1 << cu_index
self._setup_packet_prototype()
if "streams" in description:
self.streams = {}
for k, v in description["streams"].items():
stream = self.device.get_memory_by_name(v["stream_id"])
self.streams[k] = stream
if v["direction"] == "output":
stream.source_ip = self
elif v["direction"] == "input":
stream.sink_ip = self
def _setup_packet_prototype(self):
self._packet = ert.ert_start_kernel_cmd()
self._packet.m_uert.m_start_cmd_struct.state = (
ert.ert_cmd_state.ERT_CMD_STATE_NEW
)
self._packet.m_uert.m_start_cmd_struct.unused = 0
self._packet.m_uert.m_start_cmd_struct.extra_cu_masks = 0
if hasattr(self, "_ctrl_reg"):
self._packet.m_uert.m_start_cmd_struct.count = (
self._call_struct.size // 4
) + 1
self._packet.m_uert.m_start_cmd_struct.opcode = ert.ert_cmd_opcode.ERT_START_CU
self._packet.m_uert.m_start_cmd_struct.type = ert.ert_cmd_type.ERT_DEFAULT
self._packet.cu_mask = self.cu_mask
@property
def register_map(self):
if not hasattr(self, "_register_map"):
if self._registers:
self._register_map = RegisterMap.create_subclass(
self._register_name, self._registers
)(self.mmio.array)
else:
raise AttributeError(
"register_map only available if the .hwh is provided"
)
return self._register_map
@property
def signature(self):
"""The signature of the `call` method"""
if hasattr(self, "_signature"):
return self._signature
else:
return None
def _call(self, *args, **kwargs):
self.start(*args, **kwargs).wait()
def _start_sw(self, *args, ap_ctrl=1, waitfor=None, **kwargs):
if not self._signature:
raise RuntimeError("Only HLS IP can be called with the wrapper")
if waitfor is not None:
raise RuntimeError("waitfor only supported on newer versions of XRT")
if kwargs:
# Resolve any kwargs to make a single args tuple
args = self._signature.bind(*args, **kwargs).args
# Resolve and pointers that need .device_address taken
args = [
a.device_address if p else a
for a, p in itertools.zip_longest(args, self._ptr_list)
]
self.mmio.write(0, self._call_struct.pack(0, *args))
self.mmio.write(0, ap_ctrl)
return WaitHandle(self)
def _start_ert(self, *args, waitfor=(), **kwargs):
if not self._signature:
raise RuntimeError("Only HLS IP can be called with the wrapper")
if kwargs:
# Resolve any kwargs to make a single args tuple
args = self._signature.bind(*args, **kwargs).args
args = [a.device_address if p else a for a, p in zip(args, self._ptr_list)]
arg_data = self._call_struct.pack(0, *args)
bo = self.device.get_exec_bo()
exec_packet = bo.as_packet(ert.ert_start_kernel_cmd)
exec_packet.m_uert.header = self._packet.m_uert.header
exec_packet.cu_mask = self.cu_mask
ctypes.memmove(exec_packet.data, arg_data, len(arg_data))
wait_bos = tuple(w._bo for w in waitfor if w is not None and w._has_bo)
if wait_bos:
return self.device.execute_bo_with_waitlist(bo, wait_bos)
else:
return self.device.execute_bo(bo)
def read(self, offset=0):
return self.mmio.read(offset)
def write(self, offset, value):
self.mmio.write(offset, value)
3.6 DefaultIP 类详解
DefaultIP 类概述:
- DefaultIP 类是一个用于操作硬件 IP 的 Python 类。
- 它是一个基类,用于更具体的驱动程序继承和扩展。
- 它使用元类 RegisterIP 来自动注册继承自 DefaultIP 的子类,以便可以自动识别和绑定到特定的IP。
主要特点和功能:
1). 初始化与属性设置:
- 在初始化过程中,该类首先尝试从传入的 description 字典中获取设备信息。如果没有提供设备信息,它会使用默认的活动设备。
- 使用 MMIO(Memory-Mapped I/O)对象来提供对硬件设备内存映射的访问。
- 根据 description 字典中的信息,设置与中断 (_interrupts) 和通用输入输出 (_gpio) 相关的属性。
- 为每个中断和GPIO创建相应的 Interrupt 或 GPIO 对象,并将其作为类属性。
2). 寄存器映射:
- 如果提供了寄存器描述,类将创建一个寄存器映射,允许以属性的方式访问和控制这些寄存器。
3). 支持ERT和软件启动:
- 如果设备支持ERT(Embedded Runtime),则使用ERT调度器来管理和调度IP核心的执行。
- 否则,使用软件控制来启动和管理IP核心。
4). 数据流支持:
- 类可以配置和管理与IP相关的数据流(例如DMA通道),包括输入和输出流。
5)执行和等待处理:
- 提供了 _start_sw 和 _start_ert 方法来启动硬件加速器,并支持等待执行完成。
- _call 方法用于直接调用硬件加速器,等待其完成。
6). 读写操作:
- 提供 read 和 write 方法以直接从MMIO地址读取数据或向其写入数据。
4. Takeaways
- ap_axiu<24,1,0,0> 是一个数据结构。
- hls::stream<ap_axiu<24,1,0,0>> 是定义一个FIFO。
- #pragma HLS INTERFACE mode=axis 是定义一个顶层接口 AXI-Stream。
- delayed_last = false 延迟一拍。
- while (!delayed_last) 在 AXI-Stream 比较常见。
- @property 装饰器在 PYNQ 框架下经常使用。
- PixelPacker 类继承自 DefaultIP 类,后者提供 register_map 方法。