SOME/IP:用Python实现协议订阅、Offer、订阅ACK与报文接收
文章目录
- 前言
- 一、代码层次
- 二、详细代码
- 1. eth_scapy_sd.py
- 2、eth_scapy_someip.py
- 3、network_define.py
- 4、packet_define.py
- 5、unpack_define.py
- 6、someip_controller.py
前言
1、需要pip安装scapy库
2、需要修改根据实际情况配置network_define.py
3、执行someip_controller.py运行本案例
4、本文参考github: eth-scapy-someip
注:最近没啥时间写文章,有兴趣自己研究下<~~>
一、代码层次
someip/
└── module/
├── eth_scapy_sd.py # SOME/IP SD服务发现结构体定义
├── eth_scapy_someip.py # SOME/IP标准结构体定义
├── network_define.py # 网络配置(IP/端口/多播组)
├── packet_define.py # 协议字段定义(Offer/Subscribe/ACK)
├── unpack_define.py # 接收报文解析器
└── someip_controller.py # 主控制逻辑(订阅/OFFER/ACK流程管理)
二、详细代码
1. eth_scapy_sd.py
import ctypes
import collections
from eth_scapy_someip import SOMEIP
from scapy.fields import *
from scapy.packet import *
from scapy.layers.inet6 import IP6Field
class _SDPacketBase(Packet):
"""base class to be used among all SD Packet definitions."""
# use this dictionary to set default values for desired fields (mostly on subclasses
# where not all fields are defined locally)
# - key : field_name, value : desired value
# - it will be used from 'init_fields' function, upon packet initialization
#
# example : _defaults = {'field_1_name':field_1_value,'field_2_name':field_2_value}
_defaults = {}
def _set_defaults(self):
"""goes through '_defaults' dict setting field default values (for those that have been defined)."""
for key in self._defaults.keys():
try:
self.get_field(key)
except KeyError:
pass
else:
self.setfieldval(key, self._defaults[key])
def init_fields(self):
"""perform initialization of packet fields with desired values.
NOTE : this funtion will only be called *once* upon class (or subclass) construction
"""
Packet.init_fields(self)
self._set_defaults()
# SD ENTRY
# - Service
# - EventGroup
class _SDEntry(_SDPacketBase):
"""Base class for SDEntry_* packages."""
TYPE_FMT = ">B"
TYPE_PAYLOAD_I = 0
# ENTRY TYPES : SERVICE
TYPE_SRV_FINDSERVICE = 0x00
TYPE_SRV_OFFERSERVICE = 0x01
TYPE_SRV = (TYPE_SRV_FINDSERVICE, TYPE_SRV_OFFERSERVICE)
# ENTRY TYPES : EVENGROUP
TYPE_EVTGRP_SUBSCRIBE = 0x06
TYPE_EVTGRP_SUBSCRIBE_ACK = 0x07
TYPE_EVTGRP = (TYPE_EVTGRP_SUBSCRIBE, TYPE_EVTGRP_SUBSCRIBE_ACK)
# overall len (UT usage)
OVERALL_LEN = 16
# 定义Entries Array结构体
fields_desc = [
ByteField("type", 0),
ByteField("index_1", 0),
ByteField("index_2", 0),
BitField("n_opt_1", 0, 4),
BitField("n_opt_2", 0, 4),
ShortField("srv_id", 0),
ShortField("inst_id", 0),
ByteField("major_ver", 0),
X3BytesField("ttl", 0),
]
def guess_payload_class(self, payload):
"""decode SDEntry depending on its type."""
pl_type = struct.unpack(
_SDEntry.TYPE_FMT,
payload[_SDEntry.TYPE_PAYLOAD_I : _SDEntry.TYPE_PAYLOAD_I + 1],
)[0]
if pl_type in _SDEntry.TYPE_SRV:
return SDEntry_Service
elif pl_type in _SDEntry.TYPE_EVTGRP:
return SDEntry_EventGroup
class SDEntry_Service(_SDEntry):
"""Service Entry."""
_defaults = {"type": _SDEntry.TYPE_SRV_FINDSERVICE}
name = "Service Entry"
fields_desc = [_SDEntry, IntField("minor_ver", 0)]
class SDEntry_EventGroup(_SDEntry):
"""EventGroup Entry."""
_defaults = {"type": _SDEntry.TYPE_EVTGRP_SUBSCRIBE}
name = "Eventgroup Entry"
fields_desc = [
_SDEntry,
BitField("res", 0, 12),
BitField("cnt", 0, 4),
ShortField("eventgroup_id", 0),
]
# SD Option
# - Configuration
# - LoadBalancing
# - IPv4 EndPoint
# - IPv6 EndPoint
# - IPv4 MultiCast
# - IPv6 MultiCast
# - IPv4 EndPoint
# - IPv6 EndPoint
class _SDOption(_SDPacketBase):
"""Base class for SDOption_* packages."""
TYPE_FMT = ">B"
TYPE_PAYLOAD_I = 2
CFG_TYPE = 0x01
CFG_OVERALL_LEN = (
4 # overall length of CFG SDOption,empty 'cfg_str' (to be used from UT)
)
LOADBALANCE_TYPE = 0x02
LOADBALANCE_LEN = 0x05
LOADBALANCE_OVERALL_LEN = 8 # overall length of LB SDOption (to be used from UT)
IP4_ENDPOINT_TYPE = 0x04
IP4_ENDPOINT_LEN = 0x0009
IP4_MCAST_TYPE = 0x14
IP4_MCAST_LEN = 0x0009
IP4_SDENDPOINT_TYPE = 0x24
IP4_SDENDPOINT_LEN = 0x0009
IP4_OVERALL_LEN = 12 # overall length of IP4 SDOption (to be used from UT)
IP6_ENDPOINT_TYPE = 0x06
IP6_ENDPOINT_LEN = 0x0015
IP6_MCAST_TYPE = 0x16
IP6_MCAST_LEN = 0x0015
IP6_SDENDPOINT_TYPE = 0x26
IP6_SDENDPOINT_LEN = 0x0015
IP6_OVERALL_LEN = 24 # overall length of IP6 SDOption (to be used from UT)
def guess_payload_class(self, payload):
"""decode SDOption depending on its type."""
pl_type = struct.unpack(
_SDOption.TYPE_FMT,
payload[_SDOption.TYPE_PAYLOAD_I : _SDOption.TYPE_PAYLOAD_I + 1],
)[0]
if pl_type == _SDOption.CFG_TYPE:
return SDOption_Config
elif pl_type == self.LOADBALANCE_TYPE:
return SDOption_LoadBalance
elif pl_type == self.IP4_ENDPOINT_TYPE:
return SDOption_IP4_EndPoint
elif pl_type == self.IP4_MCAST_TYPE:
return SDOption_IP4_Multicast
elif pl_type == self.IP4_SDENDPOINT_TYPE:
return SDOption_IP4_SD_EndPoint
elif pl_type == self.IP6_ENDPOINT_TYPE:
return SDOption_IP6_EndPoint
elif pl_type == self.IP6_MCAST_TYPE:
return SDOption_IP6_Multicast
elif pl_type == self.IP6_SDENDPOINT_TYPE:
return SDOption_IP6_SD_EndPoint
class _SDOption_Header(_SDOption):
fields_desc = [
ShortField("len", None),
ByteField("type", 0),
ByteField("res_hdr", 0),
]
class _SDOption_Tail(_SDOption):
fields_desc = [
ByteField("res_tail", 0),
ByteEnumField("l4_proto", 0x06, {0x06: "TCP", 0x11: "UDP"}),
ShortField("port", 0),
]
class _SDOption_IP4(_SDOption):
fields_desc = [_SDOption_Header, IPField("addr", "0.0.0.0"), _SDOption_Tail]
class _SDOption_IP6(_SDOption):
fields_desc = [
_SDOption_Header,
IP6Field("addr", "2001:cdba:0000:0000:0000:0000:3257:9652"),
_SDOption_Tail,
]
class SDOption_Config(_SDOption):
# offset to be added upon length calculation (corresponding to header's "Reserved" field)
LEN_OFFSET = 0x01
name = "Config Option"
# default values specification
_defaults = {"type": _SDOption.CFG_TYPE}
# package fields definiton
fields_desc = [_SDOption_Header, StrField("cfg_str", "")]
def post_build(self, p, pay):
# length computation excluding 16b_length and 8b_type
l = self.len
if l is None:
l = len(self.cfg_str) + self.LEN_OFFSET
p = struct.pack("!H", l) + p[2:]
return p + pay
class SDOption_LoadBalance(_SDOption):
name = "LoadBalance Option"
# default values specification
_defaults = {"type": _SDOption.LOADBALANCE_TYPE, "len": _SDOption.LOADBALANCE_LEN}
# package fields definiton
fields_desc = [_SDOption_Header, ShortField("priority", 0), ShortField("weight", 0)]
# SDOPTIONS : IPv4-specific
class SDOption_IP4_EndPoint(_SDOption_IP4):
name = "IP4 EndPoint Option"
# default values specification
_defaults = {"type": _SDOption.IP4_ENDPOINT_TYPE, "len": _SDOption.IP4_ENDPOINT_LEN}
class SDOption_IP4_Multicast(_SDOption_IP4):
name = "IP4 Multicast Option"
# default values specification
_defaults = {"type": _SDOption.IP4_MCAST_TYPE, "len": _SDOption.IP4_MCAST_LEN}
class SDOption_IP4_SD_EndPoint(_SDOption_IP4):
name = "IP4 SDEndPoint Option"
# default values specification
_defaults = {
"type": _SDOption.IP4_SDENDPOINT_TYPE,
"len": _SDOption.IP4_SDENDPOINT_LEN,
}
# SDOPTIONS : IPv6-specific
class SDOption_IP6_EndPoint(_SDOption_IP6):
name = "IP6 EndPoint Option"
# default values specification
_defaults = {"type": _SDOption.IP6_ENDPOINT_TYPE, "len": _SDOption.IP6_ENDPOINT_LEN}
class SDOption_IP6_Multicast(_SDOption_IP6):
name = "IP6 Multicast Option"
# default values specification
_defaults = {"type": _SDOption.IP6_MCAST_TYPE, "len": _SDOption.IP6_MCAST_LEN}
class SDOption_IP6_SD_EndPoint(_SDOption_IP6):
name = "IP6 SDEndPoint Option"
# default values specification
_defaults = {
"type": _SDOption.IP6_SDENDPOINT_TYPE,
"len": _SDOption.IP6_SDENDPOINT_LEN,
}
#
# SD PACKAGE DEFINITION
#
class SD(_SDPacketBase):
"""
SD Packet
NOTE : when adding 'entries' or 'options', do not use list.append() method but create a new list
e.g. : p = SD()
p.option_array = [SDOption_Config(),SDOption_IP6_EndPoint()]
"""
SOMEIP_MSGID_SRVID = 0xFFFF
SOMEIP_MSGID_SUBID = 0x1
SOMEIP_MSGID_EVENTID = 0x100
SOMEIP_PROTO_VER = 0x01
SOMEIP_IFACE_VER = 0x01
SOMEIP_MSG_TYPE = SOMEIP.TYPE_NOTIFICATION
name = "SD"
# Flags definition: {"name":(mask,offset)}
_sdFlag = collections.namedtuple("Flag", "mask offset")
FLAGSDEF = {
"REBOOT": _sdFlag(mask=0x80, offset=7), # ReBoot flag
"UNICAST": _sdFlag(mask=0x40, offset=6), # UniCast flag
}
fields_desc = [
ByteField("flags", 0),
X3BytesField("res", 0),
FieldLenField(
name="len_entry_array", default=None, length_of="entry_array", fmt="!I"
),
PacketListField(
name="entry_array",
default=None,
pkt_cls=_SDEntry,
length_from=lambda pkt: pkt.len_entry_array,
),
FieldLenField(
name="len_option_array", default=None, length_of="option_array", fmt="!I"
),
PacketListField(
name="option_array",
default=None,
pkt_cls=_SDOption,
length_from=lambda pkt: pkt.len_option_array,
),
]
def __init__(self, *args, **kwargs):
super(SD, self).__init__(*args, **kwargs)
self.explicit = 1
def getFlag(self, name):
"""get particular flag from bitfield."""
name = name.upper()
if name in self.FLAGSDEF:
return (self.flags & self.FLAGSDEF[name].mask) >> self.FLAGSDEF[name].offset
else:
return None
def setFlag(self, name, value):
"""
Set particular flag on bitfield.
:param str name : name of the flag to set (see SD.FLAGSDEF)
:param int value : either 0x1 or 0x0 (provided int will be ANDed with 0x01)
"""
name = name.upper()
if name in self.FLAGSDEF:
self.flags = (
self.flags & ctypes.c_ubyte(~self.FLAGSDEF[name].mask).value
) | (value & 0x01) << self.FLAGSDEF[name].offset
def setEntryArray(self, entry_list):
"""
Add entries to entry_array.
:param entry_list: list of entries to be added. Single entry object also accepted
"""
if isinstance(entry_list, list):
self.entry_array = entry_list
else:
self.entry_array = [entry_list]
def setOptionArray(self, option_list):
"""
Add options to option_array.
:param option_list: list of options to be added. Single option object also accepted
"""
if isinstance(option_list, list):
self.option_array = option_list
else:
self.option_array = [option_list]
def getSomeip(self, stacked=False):
"""
return SD-initialized SOME/IP packet
:param stacked: boolean. Either just SOME/IP packet or stacked over SD-self
"""
p = SOMEIP()
p.msg_id.srv_id = SD.SOMEIP_MSGID_SRVID
p.msg_id.sub_id = SD.SOMEIP_MSGID_SUBID
p.msg_id.event_id = SD.SOMEIP_MSGID_EVENTID
p.proto_ver = SD.SOMEIP_PROTO_VER
p.iface_ver = SD.SOMEIP_IFACE_VER
p.msg_type = SD.SOMEIP_MSG_TYPE
if stacked:
return p / self
else:
return p
def get_someip_with_session_id(self,session_id, stacked=False):
p = SOMEIP()
p.msg_id.srv_id = SD.SOMEIP_MSGID_SRVID
p.msg_id.sub_id = SD.SOMEIP_MSGID_SUBID
p.msg_id.event_id = SD.SOMEIP_MSGID_EVENTID
p.proto_ver = SD.SOMEIP_PROTO_VER
p.iface_ver = SD.SOMEIP_IFACE_VER
p.msg_type = SD.SOMEIP_MSG_TYPE
p.req_id.session_id = session_id
if stacked:
return p / self
else:
return p
2、eth_scapy_someip.py
from scapy.layers.inet import *
from scapy.fields import *
from scapy.packet import *
"""SOMEIP PACKAGE DEFINITION"""
class _SOMEIP_MessageId(Packet):
"""MessageId subpacket."""
name = 'MessageId'
fields_desc = [
ShortField('srv_id', 0),
BitEnumField('sub_id', 0, 1, {0: 'METHOD_ID', 1: 'EVENT_ID'}),
ConditionalField(BitField('method_id', 0, 15), lambda pkt: pkt.sub_id == 0),
ConditionalField(BitField('event_id', 0, 15), lambda pkt: pkt.sub_id == 1)
]
def extract_padding(self, p):
return '', p
class _SOMEIP_RequestId(Packet):
""" RequestId subpacket."""
name = 'RequestId'
fields_desc = [
ShortField('client_id', 0),
ShortField('session_id', 0)]
def extract_padding(self, p):
return '', p
class SOMEIP(Packet):
""" SOME/IP Packet."""
# Default values
PROTOCOL_VERSION = 0x01
INTERFACE_VERSION = 0x01
# Lenght offset (without payload)
LEN_OFFSET = 0x08
# SOME/IP TYPE VALUES
TYPE_REQUEST = 0x00
TYPE_REQUEST_NO_RET = 0x01
TYPE_NOTIFICATION = 0x02
TYPE_REQUEST_ACK = 0x40
TYPE_REQUEST_NORET_ACK = 0x41
TYPE_NOTIFICATION_ACK = 0x42
TYPE_RESPONSE = 0x80
TYPE_ERROR = 0x81
TYPE_RESPONSE_ACK = 0xc0
TYPE_ERROR_ACK = 0xc1
# SOME/IP-TP TYPE VALUES
TYPE_REQUEST_SEGMENT = 0x20
TYPE_REQUEST_NO_RET_SEGMENT = 0x21
TYPE_NOTIFICATION_SEGMENT = 0x22
TYPE_REQUEST_ACK_SEGMENT = 0x60
TYPE_REQUEST_NORET_ACK_SEGMENT = 0x61
TYPE_NOTIFICATION_ACK_SEGMENT = 0x62
TYPE_RESPONSE_SEGMENT = 0xa0
TYPE_ERROR_SEGMENT = 0xa1
TYPE_RESPONSE_ACK_SEGMENT = 0xe0
TYPE_ERROR_ACK_SEGMENT = 0xe1
SOMEIP_TP_TYPES = frozenset({TYPE_REQUEST_SEGMENT, TYPE_REQUEST_NO_RET_SEGMENT, TYPE_NOTIFICATION_SEGMENT,
TYPE_REQUEST_ACK_SEGMENT, TYPE_REQUEST_NORET_ACK_SEGMENT,
TYPE_NOTIFICATION_ACK_SEGMENT, TYPE_RESPONSE_SEGMENT, TYPE_ERROR_SEGMENT,
TYPE_RESPONSE_ACK_SEGMENT, TYPE_ERROR_ACK_SEGMENT})
SOMEIP_TP_TYPE_BIT_MASK = 0x20
# SOME/IP RETURN CODES
RET_E_OK = 0x00
RET_E_NOT_OK = 0x01
RET_E_UNKNOWN_SERVICE = 0x02
RET_E_UNKNOWN_METHOD = 0x03
RET_E_NOT_READY = 0x04
RET_E_NOT_REACHABLE = 0x05
RET_E_TIMEOUT = 0x06
RET_E_WRONG_PROTOCOL_V = 0x07
RET_E_WRONG_INTERFACE_V = 0x08
RET_E_MALFORMED_MSG = 0x09
RET_E_WRONG_MESSAGE_TYPE = 0x0a
# SOME/IP-TP More Segments Flag
SOMEIP_TP_LAST_SEGMENT = 0
SOMEIP_TP_MORE_SEGMENTS = 1
_OVERALL_LEN_NOPAYLOAD = 16 # UT
name = 'SOME/IP'
fields_desc = [
PacketField('msg_id', _SOMEIP_MessageId(), _SOMEIP_MessageId), # MessageID
IntField('len', None), # Length
PacketField('req_id', _SOMEIP_RequestId(), _SOMEIP_RequestId), # RequestID
ByteField('proto_ver', PROTOCOL_VERSION), # Protocol version
ByteField('iface_ver', INTERFACE_VERSION), # Interface version
ByteEnumField('msg_type', TYPE_REQUEST, { # -- Message type --
TYPE_REQUEST: 'REQUEST', # 0x00
TYPE_REQUEST_NO_RET: 'REQUEST_NO_RETURN', # 0x01
TYPE_NOTIFICATION: 'NOTIFICATION', # 0x02
TYPE_REQUEST_ACK: 'REQUEST_ACK', # 0x40
TYPE_REQUEST_NORET_ACK: 'REQUEST_NO_RETURN_ACK', # 0x41
TYPE_NOTIFICATION_ACK: 'NOTIFICATION_ACK', # 0x42
TYPE_RESPONSE: 'RESPONSE', # 0x80
TYPE_ERROR: 'ERROR', # 0x81
TYPE_RESPONSE_ACK: 'RESPONSE_ACK', # 0xc0
TYPE_ERROR_ACK: 'ERROR_ACK', # 0xc1
}),
ByteEnumField('retcode', 0, { # -- Return code --
RET_E_OK: 'E_OK', # 0x00
RET_E_NOT_OK: 'E_NOT_OK', # 0x01
RET_E_UNKNOWN_SERVICE: 'E_UNKNOWN_SERVICE', # 0x02
RET_E_UNKNOWN_METHOD: 'E_UNKNOWN_METHOD', # 0x03
RET_E_NOT_READY: 'E_NOT_READY', # 0x04
RET_E_NOT_REACHABLE: 'E_NOT_REACHABLE', # 0x05
RET_E_TIMEOUT: 'E_TIMEOUT', # 0x06
RET_E_WRONG_PROTOCOL_V: 'E_WRONG_PROTOCOL_VERSION', # 0x07
RET_E_WRONG_INTERFACE_V: 'E_WRONG_INTERFACE_VERSION', # 0x08
RET_E_MALFORMED_MSG: 'E_MALFORMED_MESSAGE', # 0x09
RET_E_WRONG_MESSAGE_TYPE: 'E_WRONG_MESSAGE_TYPE', # 0x0a
}),
ConditionalField(BitField('offset', 0, 28), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES),
ConditionalField(BitField('reserved', 0, 3), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES),
ConditionalField(BitEnumField('more_segments', 0, 1, {SOMEIP_TP_LAST_SEGMENT: 'Last_Segment',
SOMEIP_TP_MORE_SEGMENTS: 'More_Segments'
}), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES)
]
def post_build(self, p, pay):
length = self.len
# length computation : RequestID + PROTOVER_IFACEVER_TYPE_RETCODE + PAYLOAD
if length is None:
length = self.LEN_OFFSET + len(pay)
p = p[:4] + struct.pack('!I', length) + p[8:]
return p + pay
for i in range(15):
bind_layers(UDP, SOMEIP, sport=30490 + i)
bind_layers(TCP, SOMEIP, sport=30490 + i)
3、network_define.py
class EthParameter:
# 广播
sd_network_card = "eth0.62"
sd_ip = "239.0.0.255"
# 本机
server_network_card = "Realtek PCIe GbE Family Controller #2"
server_ip = "192.168.62.31"
# 域控
client_network_card = "eth0.62"
client_ip = "192.168.62.11"
sd_port = 30490
producer_port = 30500
consumer_prot = 30501
4、packet_define.py
import eth_scapy_sd as sd
from network_define import EthParameter
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
class SomeipPacker:
def __init__(self):
self.eth_para = EthParameter()
self.sd_session_id = 1
self.client_session_id = 1
def update_sd_session_id(self):
"""更新session_id,从0递增到65535"""
self.sd_session_id += 1
if self.sd_session_id >= 65535:
self.sd_session_id = 0
def update_client_session_id(self):
self.client_session_id += 1
if self.client_session_id >= 65535:
self.client_session_id = 0
# ===========================#
# 序列化 #
# ===========================#
def packet_subscribe(self, service_id: list, ttl: int):
"""组事件订阅包"""
subscribe_packager = sd.SD()
subscribe_packager.setFlag("REBOOT", 1)
subscribe_packager.setFlag("UNICAST", 1)
subscribe_packager.len_entry_array = 16 * len(service_id)
subscribe_packager.len_option_array = 24
for i in service_id:
subscribe_packager.entry_array.append(
sd.SDEntry_EventGroup(
type=sd.SDEntry_EventGroup.TYPE_EVTGRP_SUBSCRIBE,
srv_id=i,
inst_id=0x1,
n_opt_1=2,
major_ver=0x1,
ttl=ttl,
eventgroup_id=0x1,
)
)
subscribe_packager.option_array = [
# UDP EndPoint
sd.SDOption_IP4_EndPoint(
addr=self.eth_para.server_ip,
l4_proto=0x11,
port=self.eth_para.consumer_prot,
),
# TCP EndPoint
sd.SDOption_IP4_EndPoint(
addr=self.eth_para.server_ip,
l4_proto=0x06,
port=self.eth_para.consumer_prot,
),
]
print(subscribe_packager.show())
subscribe_package = (
Ether()
/ IP(src=self.eth_para.server_ip, dst=self.eth_para.client_ip)
/ UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port)
/ subscribe_packager.get_someip_with_session_id(
self.client_session_id, True
)
)
self.update_client_session_id()
return subscribe_package
def packet_subscribe_ack(self, service_id: list, ttl: int):
"""组ack包"""
ack_packager = sd.SD()
ack_packager.setFlag("REBOOT", 1)
ack_packager.setFlag("UNICAST", 1)
ack_packager.len_entry_array = 16 * len(service_id)
ack_packager.len_option_array = 0
for i in service_id:
ack_packager.entry_array.append(
sd.SDEntry_EventGroup(
type=sd.SDEntry_EventGroup.TYPE_EVTGRP_SUBSCRIBE_ACK,
srv_id=i,
inst_id=0x1,
major_ver=0x1,
ttl=ttl,
eventgroup_id=0x1,
)
)
print(ack_packager.show())
ack_package = (
Ether()
/ IP(src=self.eth_para.server_ip, dst=self.eth_para.client_ip)
/ UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port)
/ ack_packager.get_someip_with_session_id(self.client_session_id, True)
)
self.update_client_session_id()
return ack_package
def packet_offer(self, service_id: list, ttl: int):
"""组offer包"""
offer_packager = sd.SD()
offer_packager.setFlag("REBOOT", 1)
offer_packager.setFlag("UNICAST", 1)
offer_packager.len_entry_array = 16 * len(service_id)
offer_packager.len_option_array = 12
for i in service_id:
offer_packager.entry_array.append(
sd.SDEntry_Service(
type=sd.SDEntry_Service.TYPE_SRV_OFFERSERVICE,
srv_id=i,
inst_id=0x1,
n_opt_1=1,
major_ver=0x1,
minor_ver=0x01,
ttl=ttl,
)
)
offer_packager.option_array = [
sd.SDOption_IP4_EndPoint(
addr=self.eth_para.server_ip,
l4_proto=0x11,
port=self.eth_para.producer_port,
)
]
print(offer_packager.show())
offer_package = (
Ether()
/ IP(src=self.eth_para.server_ip, dst=self.eth_para.sd_ip)
/ UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port)
/ offer_packager.get_someip_with_session_id(self.sd_session_id, True)
)
self.update_sd_session_id()
return offer_package
5、unpack_define.py
from scapy.packet import Raw
from typing import Optional
from dataclasses import dataclass
@dataclass
class SomeIPHeaderParams:
service_id: int
event_id: int # 或 event_id,取决于消息类型
client_id: int
session_id: int
msg_type: int
return_code: int
length: int
protocol_version: int
interface_version: int
class SomeipUnpacker:
@staticmethod
def get_someip_header_params(receive_packet) -> Optional[SomeIPHeaderParams]:
"""获取someip header"""
try:
if receive_packet.haslayer("SOME/IP"):
someip_layer = receive_packet["SOME/IP"]
# 获取someip header值
service_id = someip_layer.msg_id.srv_id
sub_id = someip_layer.msg_id.sub_id
event_id = sub_id << 15 | someip_layer.msg_id.event_id
# 提取请求 ID(Client ID 和 Session ID)
req_id = someip_layer.req_id
client_id = req_id.client_id
session_id = req_id.session_id
# 构造头部对象
return SomeIPHeaderParams(
service_id=service_id,
event_id=event_id,
client_id=client_id,
session_id=session_id,
msg_type=someip_layer.msg_type,
return_code=someip_layer.retcode,
length=someip_layer.len,
protocol_version=someip_layer.proto_ver,
interface_version=someip_layer.iface_ver,
)
except (AttributeError, ValueError) as e:
print(f"解析失败: {e}")
return None
@staticmethod
def get_someip_payload(receive_packet):
"""获取someip payload"""
try:
someip_payload = receive_packet[Raw].load
# print(f"someip_payload: {someip_payload}")
return someip_payload
except AttributeError:
return None
6、someip_controller.py
import threading
import socket
import time
from module.packet_define import *
from module.unpack_define import *
from scapy.sendrecv import sendp, sniff
class SomeipController(threading.Thread):
def __init__(self, someip_packer: SomeipPacker, someip_unpacker: SomeipUnpacker):
super().__init__()
self.someip_packer = someip_packer
self.someip_unpacker = someip_unpacker
self._stop_event = threading.Event()
self.running = True
def run(self):
# 创建并发送offer以及订阅ack报文(这里忽略订阅,直接回复订阅ack)
service_id_list = [0xA994, 0xA995]
_offer = self.someip_packer.packet_offer(
service_id=service_id_list,
ttl=3
)
_offer_sender = SomeipSendLoop(
network_card=EthParameter.sd_network_card,
someip_packer=_offer,
cycle_time_ms=2000,
)
# 回复订阅ACK
_subscribe_ack = self.someip_packer.packet_subscribe_ack(
service_id=service_id_list,
ttl=3
)
_subscribe_ack_sender = SomeipSendLoop(
network_card=EthParameter.sd_network_card,
someip_packer=_subscribe_ack,
cycle_time_ms=2000,
)
# 创建并发送订阅报文(订阅0xAB03, 0xAB04)
_subscribe = self.someip_packer.packet_subscribe(
service_id=[0xAB03,0xAB04], ttl=0x03
)
_subscribe_sender = SomeipSendLoop(
network_card=EthParameter.server_network_card,
someip_packer=_subscribe,
cycle_time_ms=2000,
)
# 启动接收模块
someip_receiver = SomeIpReceiver(EthParameter.server_network_card)
while self._stop_event:
time.sleep(0.01)
def stop(self):
self.running = False # 设置标志位为 False 来停止
self._stop_event.set()
self.join()
class SomeipSendLoop(threading.Thread):
def __init__(self, network_card, someip_packer, cycle_time_ms):
super().__init__()
self.network_card = network_card
self.someip_packer = someip_packer
self.cycle_time_ms = cycle_time_ms
self._stop_event = threading.Event()
self.start()
def run(self):
while self._stop_event:
if self.someip_packer:
sendp(self.someip_packer, iface=self.network_card, verbose=0)
time.sleep(self.cycle_time_ms / 1000)
def stop(self):
self._stop_event.set()
self.join()
class SomeIpReceiver(threading.Thread):
def __init__(
self,
eth_desc: str,
someip_unpacker
):
super().__init__()
self._eth_desc = eth_desc
self._unpacker = someip_unpacker
self._terminated = False
self._stop_event =threading.Event()
self.start()
def packet_callback(self, packet):
if packet:
header_param = self._unpacker.get_someip_header_params(
receive_packet=packet
)
print(f"msg_type: {header_param.msg_type}")
def run(self):
sniff(
iface=self._eth_desc,
prn=self.packet_callback,
filter="udp",
stop_filter=lambda x: self._terminated,
)
while self._stop_event:
# 保持sniff嗅探不中断
time.sleep(1)
def stop(self):
self._terminated = True
self._stop_event.set()
self.join()
# 如果协议有TCP
class SomeipTcpSocket(threading.Thread):
def __init__(self):
super().__init__()
self.tcp_sock: socket = None
self._stop_event = threading.Event()
self.start()
def create_tcp_socket(self):
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.tcp_sock.bind((EthParameter.icc_android_ip, EthParameter.consumer_prot))
self.tcp_sock.connect((EthParameter.adcc_ip, EthParameter.producer_port))
def run(self):
self.create_tcp_socket()
while self._stop_event:
# 保持TCP通信
time.sleep(1)
def stop(self):
self._stop_event.set()
self.join()
if __name__ == "__main__":
s_socket = SomeipTcpSocket()
s_packer = SomeipPacker()
s_unpacker = SomeipUnpacker()
sim_server = SomeipController(
s_packer,
s_unpacker
)
sim_server.start()