信雅纳网络测试的二次开发集成:XOA(Xena Open-Source Automation)开源自动化测试
目录
XOA是什么
XOA CLI
XOA Python API
XOA Python Test Suite/测试套件
XOA Converter
Source Code
XOA是什么
XOA(Xena Open-Source Automation)是一个开源的测试自动化框架,追求“高效、易用、灵活”的跨操作系统的开发框架。能与Xena现有解决方案无缝配合,借助XOA可调用Xena(Z系列打流仪、E系列损伤仪)完成自动化测试任务。同时它提供了操作接口可将其他仪表/设备做并栈集成测试验证,统一整理输出测试报告。
XOA包含:
XOA CLI、XOA Python API、XOA Python TestSuite、XOA Converter
XOA CLI
XOA CLI 提供了一套简洁直观的基于文本语言的独立命令,用以控制和集成 Xena测试仪硬件,实现各种测试任务的自动化。
任何客户端平台/编程语言(如 Python、Tcl、Bash)都可与 XOA CLI 配合使用。
CLI 可在远程登录终端上使用,直接向 Xena 测试仪发送命令。
ValkyrieManager通过测试端口配置文件(.xpc ),可在 XOA CLI 环境之间无缝转换。
XOA Python API
XOA Python API与XOA CLI和XenaManager无缝集成
- 面向对象的高级抽象: XOA Python API采用面向对象的方法,提供了更高层次的抽象,加快了自动化脚本的开发。
- 集成开发环境自动完成,内置手册: XOA Python API 包含 IDE 自动完成以及类、函数和 API 内置手册等功能,可显著提高开发效率。
- 命令分组和响应自动匹配:该功能允许命令分组和响应自动匹配,从而优化了测试执行效率。
- 服务器到客户端推送通知订阅: XOA Python API 支持服务器到客户端的推送通知订阅,降低了用户代码的复杂性。
- 支持 Python 3.8 及更高版本: XOA Python API 兼容 Python 3.8 及更高版本,确保与现代 Python 环境兼容。
import asyncio from contextlib import suppress from xoa_driver import testers from xoa_driver import modules from xoa_driver import ports from xoa_driver import utils from xoa_driver import enums from xoa_driver import exceptions from ipaddress import IPv4Address, IPv6Address from binascii import hexlify from xoa_driver.misc import Hex #--------------------------- # Global parameters #--------------------------- CHASSIS_IP = "10.165.16.70" # Chassis IP address or hostname USERNAME = "XOA" # Username MODULE_INDEX = 4 # Module index TX_PORT_INDEX = 0 # TX Port index FRAME_SIZE_BYTES = 4178 # Frame size on wire including the FCS. FRAME_COUNT = 20 # The number of frames including the first, the middle, and the last. REPETITION = 1 # The number of repetitions of the frame sequence, set to 0 if you want the port to repeat over and over TRAFFIC_RATE_FPS = 100 # Traffic rate in frames per second TRAFFIC_RATE_PERCENT = int(4/10 * 1000000) SHOULD_BURST = False # Whether the middle frames should be bursty BURST_SIZE_FRAMES = 9 # Burst size in frames for the middle frames INTER_BURST_GAP_BYTES = 3000 # The inter-burst gap in bytes INTRA_BURST_GAP_BYTES = 1000 # The inter-frame gap within a burst, aka. intra-burst gap, in bytes #--------------------------- # Header content for streams #--------------------------- class Ethernet: def __init__(self): self.dst_mac = "0000.0000.0000" self.src_mac = "0000.0000.0000" self.ethertype = "86DD" def __str__(self): _dst_mac = self.dst_mac.replace(".", "") _src_mac = self.src_mac.replace(".", "") _ethertype = self.ethertype return f"{_dst_mac}{_src_mac}{_ethertype}".upper() class IPV4: def __init__(self): self.version = 4 self.header_length = 5 self.dscp = 0 self.ecn = 0 self.total_length = 42 self.identification = "0000" self.flags = 0 self.offset = 0 self.ttl = 255 self.proto = 255 self.checksum = "0000" self.src = "0.0.0.0" self.dst = "0.0.0.0" def __str__(self): _ver = '{:01X}'.format(self.version) _header_length = '{:01X}'.format(self.header_length) _dscp_ecn = '{:02X}'.format((self.dscp<<2)+self.ecn) _total_len = '{:04X}'.format(self.total_length) _ident = self.identification _flag_offset = '{:04X}'.format((self.flags<<13)+self.offset) _ttl = '{:02X}'.format(self.ttl) _proto = '{:02X}'.format(self.proto) _check = self.checksum _src = hexlify(IPv4Address(self.src).packed).decode() _dst = hexlify(IPv4Address(self.dst).packed).decode() return f"{_ver}{_header_length}{_dscp_ecn}{_total_len}{_ident}{_flag_offset}{_ttl}{_proto}{_check}{_src}{_dst}".upper() class IPV6: def __init__(self): self.version = 6 self.traff_class = 8 self.flow_label = 0 self.payload_length = 0 self.next_header = "11" self.hop_limit = 1 self.src = "2000::2" self.dst = "2000::100" def __str__(self): _ver = '{:01X}'.format(self.version) _traff_class = '{:01X}'.format(self.traff_class) _flow_label = '{:06X}'.format(self.flow_label) _payload_len = '{:04X}'.format(self.payload_length) _next_header = self.next_header _hop_limit = '{:02X}'.format(self.hop_limit) _src = hexlify(IPv6Address(self.src).packed).decode() _dst = hexlify(IPv6Address(self.dst).packed).decode() return f"{_ver}{_traff_class}{_flow_label}{_payload_len}{_next_header}{_hop_limit}{_src}{_dst}".upper() class UDP: def __init__(self): self.src_port = 0 self.dst_port = 0 self.length = 0 self.checksum = 0 def __str__(self): _src_port = '{:04X}'.format(self.src_port) _dst_port = '{:04X}'.format(self.dst_port) _length = '{:04X}'.format(self.length) _checksum = '{:04X}'.format(self.checksum) return f"{_src_port}{_dst_port}{_length}{_checksum}".upper() class ROCEV2: def __init__(self): self.opcode = 0 self.solicited_event = 0 self.mig_req = 0 self.pad_count = 1 self.header_version = 0 self.partition_key = 65535 self.reserved = 7 self.dest_queue_pair = 2 self.ack_request = 0 self.reserved_7bits = 0 self.packet_seq_number =0 def __str__(self): _opcode = '{:02X}'.format(self.opcode) _combo_1 = '{:02X}'.format((self.solicited_event<<7)+(self.mig_req<<6)+(self.pad_count<<4)+self.header_version) _pk = '{:04X}'.format(self.partition_key) _reserved = '{:02X}'.format(self.reserved) _qp = '{:06X}'.format(self.dest_queue_pair) _combo_2 = '{:02X}'.format((self.ack_request<<7)+self.reserved_7bits) _ps = '{:06X}'.format(self.packet_seq_number) return f"{_opcode}{_combo_1}{_pk}{_reserved}{_qp}{_combo_2}{_ps}".upper() #------------------------------ # def my_awesome_func() #------------------------------ async def my_awesome_func(stop_event: asyncio.Event, should_burst: bool) -> None: """This Python function uses XOA Python API to configure the TX port :param stop_event: :type stop_event: asyncio.Event :param should_burst: Whether the middle frames should be bursty. :type should_burst: bool """ # create tester instance and establish connection tester = await testers.L23Tester(CHASSIS_IP, USERNAME, enable_logging=False) # access the module on the tester module = tester.modules.obtain(MODULE_INDEX) # check if the module is of type Loki-100G-5S-2P if not isinstance(module, modules.ModuleChimera): # access the txport on the module txport = module.ports.obtain(TX_PORT_INDEX) #--------------------------- # Port reservation #--------------------------- print(f"#---------------------------") print(f"# Port reservation") print(f"#---------------------------") if txport.is_released(): print(f"The txport is released (not owned by anyone). Will reserve the txport to continue txport configuration.") await txport.reservation.set_reserve() # set reservation , means txport will be controlled by our session elif not txport.is_reserved_by_me(): print(f"The txport is reserved by others. Will relinquish and reserve the txport to continue txport configuration.") await txport.reservation.set_relinquish() # send relinquish the txport await txport.reservation.set_reserve() # set reservation , means txport will be controlled by our session #--------------------------- # Start port configuration #--------------------------- print(f"#---------------------------") print(f"# Start port configuration") print(f"#---------------------------") print(f"Reset the txport") await txport.reset.set() print(f"Configure the txport") await utils.apply( # txport.speed.mode.selection.set(mode=enums.PortSpeedMode.F100G), txport.comment.set(comment="RoCE2 on Loki"), txport.tx_config.enable.set_on(), txport.latency_config.offset.set(offset=0), txport.latency_config.mode.set(mode=enums.LatencyMode.LAST2LAST), txport.tx_config.burst_period.set(burst_period=0), txport.tx_config.packet_limit.set(packet_count_limit=FRAME_COUNT*REPETITION), txport.max_header_length.set(max_header_length=128), txport.autotrain.set(interval=0), txport.loop_back.set_none(), # If you want loopback the port TX to its own RX, change it to set_txoff2rx() txport.checksum.set(offset=0), txport.tx_config.delay.set(delay_val=0), txport.tpld_mode.set_normal(), txport.payload_mode.set_normal(), #txport.rate.pps.set(port_rate_pps=TRAFFIC_RATE_FPS), # If you want to control traffic rate with FPS, uncomment this. txport.rate.fraction.set(TRAFFIC_RATE_PERCENT), # If you want to control traffic rate with fraction, uncomment this. 1,000,000 = 100% ) if should_burst: await txport.tx_config.mode.set_burst() else: await txport.tx_config.mode.set_sequential() #-------------------------------------- # Configure stream_0 on the txport #-------------------------------------- print(f" Configure first-packet stream on the txport") stream_0 = await txport.streams.create() eth = Ethernet() eth.src_mac = "aaaa.aaaa.0005" eth.dst_mac = "bbbb.bbbb.0005" ipv4 = IPV4() ipv4.src = "1.1.1.5" ipv4.dst = "2.2.2.5" ipv6 = IPV6() ipv6.src = "2001::5" ipv6.dst = "2002::5" udp = UDP() udp.src_port = 4791 udp.dst_port = 4791 rocev2 = ROCEV2() rocev2.opcode = 0 rocev2.dest_queue_pair = 2 rocev2.packet_seq_number = 0 await utils.apply( stream_0.enable.set_on(), stream_0.packet.limit.set(packet_count=1), stream_0.comment.set(f"First packet"), stream_0.rate.fraction.set(stream_rate_ppm=10000), stream_0.packet.header.protocol.set(segments=[ enums.ProtocolOption.ETHERNET, enums.ProtocolOption.IPV6, enums.ProtocolOption.UDP, enums.ProtocolOption.RAW_12, ]), stream_0.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))), stream_0.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES), stream_0.payload.content.set( payload_type=enums.PayloadType.PATTERN, hex_data=Hex("AABBCCDD") ), stream_0.tpld_id.set(test_payload_identifier = 0), stream_0.insert_packets_checksum.set_on() ) if should_burst: await stream_0.burst.burstiness.set(size=1, density=100) await stream_0.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0) #-------------------------------------- # Configure stream_1 on the txport #-------------------------------------- print(f" Configure middle-packets stream on the txport") stream_1 = await txport.streams.create() rocev2.opcode = 1 rocev2.dest_queue_pair = 2 rocev2.packet_seq_number = 1 await utils.apply( stream_1.enable.set_on(), stream_1.packet.limit.set(packet_count=FRAME_COUNT-2), stream_1.comment.set(f"Middle packets"), stream_1.rate.fraction.set(stream_rate_ppm=10000), stream_1.packet.header.protocol.set(segments=[ enums.ProtocolOption.ETHERNET, enums.ProtocolOption.IPV6, enums.ProtocolOption.UDP, enums.ProtocolOption.RAW_12, ]), stream_1.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))), stream_1.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES), stream_1.payload.content.set( payload_type=enums.PayloadType.PATTERN, hex_data=Hex("AABBCCDD") ), stream_1.tpld_id.set(test_payload_identifier = 1), stream_1.insert_packets_checksum.set_on() ) if should_burst: await stream_1.burst.burstiness.set(size=BURST_SIZE_FRAMES, density=100) await stream_1.burst.gap.set(inter_packet_gap=INTRA_BURST_GAP_BYTES, inter_burst_gap=INTER_BURST_GAP_BYTES) # Configure a modifier on the stream_1 await stream_1.packet.header.modifiers.configure(1) # Modifier on the SQN modifier = stream_1.packet.header.modifiers.obtain(0) await modifier.specification.set(position=72, mask="FFFF0000", action=enums.ModifierAction.INC, repetition=1) await modifier.range.set(min_val=1, step=1, max_val=FRAME_COUNT-2) #-------------------------------------- # Configure stream_2 on the txport #-------------------------------------- print(f" Configure last-packet stream on the txport") stream_2 = await txport.streams.create() rocev2.opcode = 2 rocev2.dest_queue_pair = 2 rocev2.packet_seq_number = FRAME_COUNT-1 await utils.apply( stream_2.enable.set_on(), stream_2.packet.limit.set(packet_count=1), stream_2.comment.set(f"Last packet"), stream_2.rate.fraction.set(stream_rate_ppm=10000), stream_2.packet.header.protocol.set(segments=[ enums.ProtocolOption.ETHERNET, enums.ProtocolOption.IPV6, enums.ProtocolOption.UDP, enums.ProtocolOption.RAW_12, ]), stream_2.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))), stream_2.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES), stream_2.payload.content.set( payload_type=enums.PayloadType.PATTERN, hex_data=Hex("AABBCCDD") ), stream_2.tpld_id.set(test_payload_identifier = 2), stream_2.insert_packets_checksum.set_on() ) if should_burst: await stream_2.burst.burstiness.set(size=1, density=100) await stream_2.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0) async def main(): stop_event =asyncio.Event() try: await my_awesome_func(stop_event, should_burst=SHOULD_BURST) except KeyboardInterrupt: stop_event.set() if __name__=="__main__": asyncio.run(main())
XOA Python Test Suite/测试套件
XOA Python 测试套件是一个测试框架,为开发人员和测试专家执行和集成 Xena 测试套件提供了定义明确的 API。
该框架以自动化方式处理各种任务,如测试资源管理、测试执行和发布测试结果。
每个 RFC 测试套件都被设计成独立的 "插件",可根据需要有选择性地集成到项目中。
目前,XOA Python 测试套件包括
- RFC2544
- RFC2889
- RFC3918
XOA Converter
如果您希望将当前的 Xena 测试套件配置快速迁移到 XOA,现在使用 XOA 转换器工具比以往任何时候都更容易。
以前,Xena的测试套件应用程序仅与Windows兼容。但今后,所有现有和未来的测试套件都将并入 XOA Python 测试套件,从而消除 Windows 限制。
为了简化过渡,我们推出了 XOA 转换器。该工具允许用户将现有的Xena测试套件配置(Xena2544、Xena2889和Xena3918)从Xena窗口桌面应用程序无缝迁移到XOA Python测试套件中。有了 XOA 转换器,迁移过程变得轻松简单。
Source Code
GitHub 是我们托管 XOA 源代码的首选平台,因为它具有出色的版本控制和协作能力。它为管理代码变更提供了一个极佳的环境,确保项目的历史记录完备且易于访问。我们崇尚开放,鼓励每个人使用、分享、贡献和反馈我们的源代码。GitHub 允许进行无缝协作,并促进以社区为导向的方法,让每个人都能积极参与 XOA 的开发和改进。我们重视来自社区的意见和贡献,因为这能提高源代码的整体质量和创新性。
- XOA Python API Source Code
- XOA Python Test Suite – Core Source Code
- XOA Python Test Suite – Plugin Source Code
- XOA ANLT Utility Source Code
- XOA Converter Source Code