当前位置: 首页 > article >正文

SOME/IP:用Python实现协议订阅、Offer、订阅ACK与报文接收

文章目录

  • 前言
  • 一、代码层次
  • 二、详细代码
    • 1. eth_scapy_sd.py
    • 2、eth_scapy_someip.py
    • 3、network_define.py
    • 4、packet_define.py
    • 5、unpack_define.py
    • 6、someip_controller.py


前言

1、需要pip安装scapy库
2、需要修改根据实际情况配置network_define.py
3、执行someip_controller.py运行本案例
4、本文参考github: eth-scapy-someip

注:最近没啥时间写文章,有兴趣自己研究下<~~>


一、代码层次

someip/
└── module/
    ├── eth_scapy_sd.py          # SOME/IP SD服务发现结构体定义
    ├── eth_scapy_someip.py      # SOME/IP标准结构体定义
    ├── network_define.py        # 网络配置(IP/端口/多播组)
    ├── packet_define.py         # 协议字段定义(Offer/Subscribe/ACK)
    ├── unpack_define.py         # 接收报文解析器
    └── someip_controller.py     # 主控制逻辑(订阅/OFFER/ACK流程管理)

二、详细代码

1. eth_scapy_sd.py

import ctypes
import collections

from eth_scapy_someip import SOMEIP

from scapy.fields import *
from scapy.packet import *
from scapy.layers.inet6 import IP6Field



class _SDPacketBase(Packet):
    """base class to be used among all SD Packet definitions."""

    # use this dictionary to set default values for desired fields (mostly on subclasses
    # where not all fields are defined locally)
    # - key : field_name, value : desired value
    # - it will be used from 'init_fields' function, upon packet initialization
    #
    # example : _defaults = {'field_1_name':field_1_value,'field_2_name':field_2_value}
    _defaults = {}

    def _set_defaults(self):
        """goes through '_defaults' dict setting field default values (for those that have been defined)."""
        for key in self._defaults.keys():
            try:
                self.get_field(key)
            except KeyError:
                pass
            else:
                self.setfieldval(key, self._defaults[key])

    def init_fields(self):
        """perform initialization of packet fields with desired values.
        NOTE : this funtion will only be called *once* upon class (or subclass) construction
        """
        Packet.init_fields(self)
        self._set_defaults()


# SD ENTRY
#  - Service
#  - EventGroup
class _SDEntry(_SDPacketBase):
    """Base class for SDEntry_* packages."""

    TYPE_FMT = ">B"
    TYPE_PAYLOAD_I = 0
    # ENTRY TYPES : SERVICE
    TYPE_SRV_FINDSERVICE = 0x00
    TYPE_SRV_OFFERSERVICE = 0x01
    TYPE_SRV = (TYPE_SRV_FINDSERVICE, TYPE_SRV_OFFERSERVICE)
    # ENTRY TYPES : EVENGROUP
    TYPE_EVTGRP_SUBSCRIBE = 0x06
    TYPE_EVTGRP_SUBSCRIBE_ACK = 0x07
    TYPE_EVTGRP = (TYPE_EVTGRP_SUBSCRIBE, TYPE_EVTGRP_SUBSCRIBE_ACK)
    # overall len (UT usage)
    OVERALL_LEN = 16

    # 定义Entries Array结构体
    fields_desc = [
        ByteField("type", 0),
        ByteField("index_1", 0),
        ByteField("index_2", 0),
        BitField("n_opt_1", 0, 4),
        BitField("n_opt_2", 0, 4),
        ShortField("srv_id", 0),
        ShortField("inst_id", 0),
        ByteField("major_ver", 0),
        X3BytesField("ttl", 0),
    ]

    def guess_payload_class(self, payload):
        """decode SDEntry depending on its type."""
        pl_type = struct.unpack(
            _SDEntry.TYPE_FMT,
            payload[_SDEntry.TYPE_PAYLOAD_I : _SDEntry.TYPE_PAYLOAD_I + 1],
        )[0]
        if pl_type in _SDEntry.TYPE_SRV:
            return SDEntry_Service
        elif pl_type in _SDEntry.TYPE_EVTGRP:
            return SDEntry_EventGroup


class SDEntry_Service(_SDEntry):
    """Service Entry."""

    _defaults = {"type": _SDEntry.TYPE_SRV_FINDSERVICE}

    name = "Service Entry"
    fields_desc = [_SDEntry, IntField("minor_ver", 0)]


class SDEntry_EventGroup(_SDEntry):
    """EventGroup Entry."""

    _defaults = {"type": _SDEntry.TYPE_EVTGRP_SUBSCRIBE}

    name = "Eventgroup Entry"
    fields_desc = [
        _SDEntry,
        BitField("res", 0, 12),
        BitField("cnt", 0, 4),
        ShortField("eventgroup_id", 0),
    ]


# SD Option
#  - Configuration
#  - LoadBalancing
#  - IPv4 EndPoint
#  - IPv6 EndPoint
#  - IPv4 MultiCast
#  - IPv6 MultiCast
#  - IPv4 EndPoint
#  - IPv6 EndPoint
class _SDOption(_SDPacketBase):
    """Base class for SDOption_* packages."""

    TYPE_FMT = ">B"
    TYPE_PAYLOAD_I = 2

    CFG_TYPE = 0x01
    CFG_OVERALL_LEN = (
        4  # overall length of CFG SDOption,empty 'cfg_str' (to be used from UT)
    )
    LOADBALANCE_TYPE = 0x02
    LOADBALANCE_LEN = 0x05
    LOADBALANCE_OVERALL_LEN = 8  # overall length of LB SDOption (to be used from UT)
    IP4_ENDPOINT_TYPE = 0x04
    IP4_ENDPOINT_LEN = 0x0009
    IP4_MCAST_TYPE = 0x14
    IP4_MCAST_LEN = 0x0009
    IP4_SDENDPOINT_TYPE = 0x24
    IP4_SDENDPOINT_LEN = 0x0009
    IP4_OVERALL_LEN = 12  # overall length of IP4 SDOption (to be used from UT)
    IP6_ENDPOINT_TYPE = 0x06
    IP6_ENDPOINT_LEN = 0x0015
    IP6_MCAST_TYPE = 0x16
    IP6_MCAST_LEN = 0x0015
    IP6_SDENDPOINT_TYPE = 0x26
    IP6_SDENDPOINT_LEN = 0x0015
    IP6_OVERALL_LEN = 24  # overall length of IP6 SDOption (to be used from UT)

    def guess_payload_class(self, payload):
        """decode SDOption depending on its type."""
        pl_type = struct.unpack(
            _SDOption.TYPE_FMT,
            payload[_SDOption.TYPE_PAYLOAD_I : _SDOption.TYPE_PAYLOAD_I + 1],
        )[0]

        if pl_type == _SDOption.CFG_TYPE:
            return SDOption_Config
        elif pl_type == self.LOADBALANCE_TYPE:
            return SDOption_LoadBalance
        elif pl_type == self.IP4_ENDPOINT_TYPE:
            return SDOption_IP4_EndPoint
        elif pl_type == self.IP4_MCAST_TYPE:
            return SDOption_IP4_Multicast
        elif pl_type == self.IP4_SDENDPOINT_TYPE:
            return SDOption_IP4_SD_EndPoint
        elif pl_type == self.IP6_ENDPOINT_TYPE:
            return SDOption_IP6_EndPoint
        elif pl_type == self.IP6_MCAST_TYPE:
            return SDOption_IP6_Multicast
        elif pl_type == self.IP6_SDENDPOINT_TYPE:
            return SDOption_IP6_SD_EndPoint


class _SDOption_Header(_SDOption):
    fields_desc = [
        ShortField("len", None),
        ByteField("type", 0),
        ByteField("res_hdr", 0),
    ]


class _SDOption_Tail(_SDOption):
    fields_desc = [
        ByteField("res_tail", 0),
        ByteEnumField("l4_proto", 0x06, {0x06: "TCP", 0x11: "UDP"}),
        ShortField("port", 0),
    ]


class _SDOption_IP4(_SDOption):
    fields_desc = [_SDOption_Header, IPField("addr", "0.0.0.0"), _SDOption_Tail]


class _SDOption_IP6(_SDOption):
    fields_desc = [
        _SDOption_Header,
        IP6Field("addr", "2001:cdba:0000:0000:0000:0000:3257:9652"),
        _SDOption_Tail,
    ]


class SDOption_Config(_SDOption):
    # offset to be added upon length calculation (corresponding to header's "Reserved" field)
    LEN_OFFSET = 0x01

    name = "Config Option"
    # default values specification
    _defaults = {"type": _SDOption.CFG_TYPE}
    # package fields definiton
    fields_desc = [_SDOption_Header, StrField("cfg_str", "")]

    def post_build(self, p, pay):
        # length computation excluding 16b_length and 8b_type
        l = self.len
        if l is None:
            l = len(self.cfg_str) + self.LEN_OFFSET
            p = struct.pack("!H", l) + p[2:]
        return p + pay


class SDOption_LoadBalance(_SDOption):
    name = "LoadBalance Option"
    # default values specification
    _defaults = {"type": _SDOption.LOADBALANCE_TYPE, "len": _SDOption.LOADBALANCE_LEN}
    # package fields definiton
    fields_desc = [_SDOption_Header, ShortField("priority", 0), ShortField("weight", 0)]


# SDOPTIONS : IPv4-specific
class SDOption_IP4_EndPoint(_SDOption_IP4):
    name = "IP4 EndPoint Option"
    # default values specification
    _defaults = {"type": _SDOption.IP4_ENDPOINT_TYPE, "len": _SDOption.IP4_ENDPOINT_LEN}


class SDOption_IP4_Multicast(_SDOption_IP4):
    name = "IP4 Multicast Option"
    # default values specification
    _defaults = {"type": _SDOption.IP4_MCAST_TYPE, "len": _SDOption.IP4_MCAST_LEN}


class SDOption_IP4_SD_EndPoint(_SDOption_IP4):
    name = "IP4 SDEndPoint Option"
    # default values specification
    _defaults = {
        "type": _SDOption.IP4_SDENDPOINT_TYPE,
        "len": _SDOption.IP4_SDENDPOINT_LEN,
    }


# SDOPTIONS : IPv6-specific
class SDOption_IP6_EndPoint(_SDOption_IP6):
    name = "IP6 EndPoint Option"
    # default values specification
    _defaults = {"type": _SDOption.IP6_ENDPOINT_TYPE, "len": _SDOption.IP6_ENDPOINT_LEN}


class SDOption_IP6_Multicast(_SDOption_IP6):
    name = "IP6 Multicast Option"
    # default values specification
    _defaults = {"type": _SDOption.IP6_MCAST_TYPE, "len": _SDOption.IP6_MCAST_LEN}


class SDOption_IP6_SD_EndPoint(_SDOption_IP6):
    name = "IP6 SDEndPoint Option"
    # default values specification
    _defaults = {
        "type": _SDOption.IP6_SDENDPOINT_TYPE,
        "len": _SDOption.IP6_SDENDPOINT_LEN,
    }


#
# SD PACKAGE DEFINITION
#
class SD(_SDPacketBase):
    """
    SD Packet

    NOTE :   when adding 'entries' or 'options', do not use list.append() method but create a new list
    e.g. :  p = SD()
            p.option_array = [SDOption_Config(),SDOption_IP6_EndPoint()]
    """

    SOMEIP_MSGID_SRVID = 0xFFFF
    SOMEIP_MSGID_SUBID = 0x1
    SOMEIP_MSGID_EVENTID = 0x100
    SOMEIP_PROTO_VER = 0x01
    SOMEIP_IFACE_VER = 0x01
    SOMEIP_MSG_TYPE = SOMEIP.TYPE_NOTIFICATION

    name = "SD"
    # Flags definition: {"name":(mask,offset)}
    _sdFlag = collections.namedtuple("Flag", "mask offset")
    FLAGSDEF = {
        "REBOOT": _sdFlag(mask=0x80, offset=7),  # ReBoot flag
        "UNICAST": _sdFlag(mask=0x40, offset=6),  # UniCast flag
    }

    fields_desc = [
        ByteField("flags", 0),
        X3BytesField("res", 0),
        FieldLenField(
            name="len_entry_array", default=None, length_of="entry_array", fmt="!I"
        ),
        PacketListField(
            name="entry_array",
            default=None,
            pkt_cls=_SDEntry,
            length_from=lambda pkt: pkt.len_entry_array,
        ),
        FieldLenField(
            name="len_option_array", default=None, length_of="option_array", fmt="!I"
        ),
        PacketListField(
            name="option_array",
            default=None,
            pkt_cls=_SDOption,
            length_from=lambda pkt: pkt.len_option_array,
        ),
    ]

    def __init__(self, *args, **kwargs):
        super(SD, self).__init__(*args, **kwargs)
        self.explicit = 1

    def getFlag(self, name):
        """get particular flag from bitfield."""
        name = name.upper()
        if name in self.FLAGSDEF:
            return (self.flags & self.FLAGSDEF[name].mask) >> self.FLAGSDEF[name].offset
        else:
            return None

    def setFlag(self, name, value):
        """
        Set particular flag on bitfield.
         :param str name : name of the flag to set (see SD.FLAGSDEF)
         :param int value : either 0x1 or 0x0 (provided int will be ANDed with 0x01)
        """
        name = name.upper()
        if name in self.FLAGSDEF:
            self.flags = (
                self.flags & ctypes.c_ubyte(~self.FLAGSDEF[name].mask).value
            ) | (value & 0x01) << self.FLAGSDEF[name].offset

    def setEntryArray(self, entry_list):
        """
        Add entries to entry_array.
        :param entry_list: list of entries to be added. Single entry object also accepted
        """
        if isinstance(entry_list, list):
            self.entry_array = entry_list
        else:
            self.entry_array = [entry_list]

    def setOptionArray(self, option_list):
        """
        Add options to option_array.
        :param option_list: list of options to be added. Single option object also accepted
        """
        if isinstance(option_list, list):
            self.option_array = option_list
        else:
            self.option_array = [option_list]

    def getSomeip(self, stacked=False):
        """
        return SD-initialized SOME/IP packet
        :param stacked: boolean. Either just SOME/IP packet or stacked over SD-self
        """
        p = SOMEIP()
        p.msg_id.srv_id = SD.SOMEIP_MSGID_SRVID
        p.msg_id.sub_id = SD.SOMEIP_MSGID_SUBID
        p.msg_id.event_id = SD.SOMEIP_MSGID_EVENTID
        p.proto_ver = SD.SOMEIP_PROTO_VER
        p.iface_ver = SD.SOMEIP_IFACE_VER
        p.msg_type = SD.SOMEIP_MSG_TYPE


        if stacked:
            return p / self
        else:
            return p

    def get_someip_with_session_id(self,session_id, stacked=False):
        p = SOMEIP()
        p.msg_id.srv_id = SD.SOMEIP_MSGID_SRVID
        p.msg_id.sub_id = SD.SOMEIP_MSGID_SUBID
        p.msg_id.event_id = SD.SOMEIP_MSGID_EVENTID
        p.proto_ver = SD.SOMEIP_PROTO_VER
        p.iface_ver = SD.SOMEIP_IFACE_VER
        p.msg_type = SD.SOMEIP_MSG_TYPE
        p.req_id.session_id = session_id

        if stacked:
            return p / self
        else:
            return p

2、eth_scapy_someip.py

from scapy.layers.inet import *
from scapy.fields import *
from scapy.packet import *

"""SOMEIP PACKAGE DEFINITION"""


class _SOMEIP_MessageId(Packet):
    """MessageId subpacket."""
    name = 'MessageId'
    fields_desc = [
        ShortField('srv_id', 0),
        BitEnumField('sub_id', 0, 1, {0: 'METHOD_ID', 1: 'EVENT_ID'}),
        ConditionalField(BitField('method_id', 0, 15), lambda pkt: pkt.sub_id == 0),
        ConditionalField(BitField('event_id', 0, 15), lambda pkt: pkt.sub_id == 1)
    ]

    def extract_padding(self, p):
        return '', p


class _SOMEIP_RequestId(Packet):
    """ RequestId subpacket."""
    name = 'RequestId'
    fields_desc = [
        ShortField('client_id', 0),
        ShortField('session_id', 0)]

    def extract_padding(self, p):
        return '', p


class SOMEIP(Packet):
    """ SOME/IP Packet."""
    # Default values
    PROTOCOL_VERSION = 0x01
    INTERFACE_VERSION = 0x01

    # Lenght offset (without payload)
    LEN_OFFSET = 0x08

    # SOME/IP TYPE VALUES
    TYPE_REQUEST = 0x00
    TYPE_REQUEST_NO_RET = 0x01
    TYPE_NOTIFICATION = 0x02
    TYPE_REQUEST_ACK = 0x40
    TYPE_REQUEST_NORET_ACK = 0x41
    TYPE_NOTIFICATION_ACK = 0x42
    TYPE_RESPONSE = 0x80
    TYPE_ERROR = 0x81
    TYPE_RESPONSE_ACK = 0xc0
    TYPE_ERROR_ACK = 0xc1

    # SOME/IP-TP TYPE VALUES
    TYPE_REQUEST_SEGMENT = 0x20
    TYPE_REQUEST_NO_RET_SEGMENT = 0x21
    TYPE_NOTIFICATION_SEGMENT = 0x22
    TYPE_REQUEST_ACK_SEGMENT = 0x60
    TYPE_REQUEST_NORET_ACK_SEGMENT = 0x61
    TYPE_NOTIFICATION_ACK_SEGMENT = 0x62
    TYPE_RESPONSE_SEGMENT = 0xa0
    TYPE_ERROR_SEGMENT = 0xa1
    TYPE_RESPONSE_ACK_SEGMENT = 0xe0
    TYPE_ERROR_ACK_SEGMENT = 0xe1
    SOMEIP_TP_TYPES = frozenset({TYPE_REQUEST_SEGMENT, TYPE_REQUEST_NO_RET_SEGMENT, TYPE_NOTIFICATION_SEGMENT,
                                 TYPE_REQUEST_ACK_SEGMENT, TYPE_REQUEST_NORET_ACK_SEGMENT,
                                 TYPE_NOTIFICATION_ACK_SEGMENT, TYPE_RESPONSE_SEGMENT, TYPE_ERROR_SEGMENT,
                                 TYPE_RESPONSE_ACK_SEGMENT, TYPE_ERROR_ACK_SEGMENT})
    SOMEIP_TP_TYPE_BIT_MASK = 0x20

    # SOME/IP RETURN CODES
    RET_E_OK = 0x00
    RET_E_NOT_OK = 0x01
    RET_E_UNKNOWN_SERVICE = 0x02
    RET_E_UNKNOWN_METHOD = 0x03
    RET_E_NOT_READY = 0x04
    RET_E_NOT_REACHABLE = 0x05
    RET_E_TIMEOUT = 0x06
    RET_E_WRONG_PROTOCOL_V = 0x07
    RET_E_WRONG_INTERFACE_V = 0x08
    RET_E_MALFORMED_MSG = 0x09
    RET_E_WRONG_MESSAGE_TYPE = 0x0a

    # SOME/IP-TP More Segments Flag
    SOMEIP_TP_LAST_SEGMENT = 0
    SOMEIP_TP_MORE_SEGMENTS = 1

    _OVERALL_LEN_NOPAYLOAD = 16  # UT

    name = 'SOME/IP'

    fields_desc = [
        PacketField('msg_id', _SOMEIP_MessageId(), _SOMEIP_MessageId),  # MessageID
        IntField('len', None),  # Length
        PacketField('req_id', _SOMEIP_RequestId(), _SOMEIP_RequestId),  # RequestID
        ByteField('proto_ver', PROTOCOL_VERSION),  # Protocol version
        ByteField('iface_ver', INTERFACE_VERSION),  # Interface version
        ByteEnumField('msg_type', TYPE_REQUEST, {  # -- Message type --
            TYPE_REQUEST: 'REQUEST',  # 0x00
            TYPE_REQUEST_NO_RET: 'REQUEST_NO_RETURN',  # 0x01
            TYPE_NOTIFICATION: 'NOTIFICATION',  # 0x02
            TYPE_REQUEST_ACK: 'REQUEST_ACK',  # 0x40
            TYPE_REQUEST_NORET_ACK: 'REQUEST_NO_RETURN_ACK',  # 0x41
            TYPE_NOTIFICATION_ACK: 'NOTIFICATION_ACK',  # 0x42
            TYPE_RESPONSE: 'RESPONSE',  # 0x80
            TYPE_ERROR: 'ERROR',  # 0x81
            TYPE_RESPONSE_ACK: 'RESPONSE_ACK',  # 0xc0
            TYPE_ERROR_ACK: 'ERROR_ACK',  # 0xc1
        }),
        ByteEnumField('retcode', 0, {  # -- Return code --
            RET_E_OK: 'E_OK',  # 0x00
            RET_E_NOT_OK: 'E_NOT_OK',  # 0x01
            RET_E_UNKNOWN_SERVICE: 'E_UNKNOWN_SERVICE',  # 0x02
            RET_E_UNKNOWN_METHOD: 'E_UNKNOWN_METHOD',  # 0x03
            RET_E_NOT_READY: 'E_NOT_READY',  # 0x04
            RET_E_NOT_REACHABLE: 'E_NOT_REACHABLE',  # 0x05
            RET_E_TIMEOUT: 'E_TIMEOUT',  # 0x06
            RET_E_WRONG_PROTOCOL_V: 'E_WRONG_PROTOCOL_VERSION',  # 0x07
            RET_E_WRONG_INTERFACE_V: 'E_WRONG_INTERFACE_VERSION',  # 0x08
            RET_E_MALFORMED_MSG: 'E_MALFORMED_MESSAGE',  # 0x09
            RET_E_WRONG_MESSAGE_TYPE: 'E_WRONG_MESSAGE_TYPE',  # 0x0a
        }),
        ConditionalField(BitField('offset', 0, 28), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES),
        ConditionalField(BitField('reserved', 0, 3), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES),
        ConditionalField(BitEnumField('more_segments', 0, 1, {SOMEIP_TP_LAST_SEGMENT: 'Last_Segment',
                                                              SOMEIP_TP_MORE_SEGMENTS: 'More_Segments'
                                                              }), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES)
    ]

    def post_build(self, p, pay):
        length = self.len
        # length computation : RequestID + PROTOVER_IFACEVER_TYPE_RETCODE + PAYLOAD
        if length is None:
            length = self.LEN_OFFSET + len(pay)
            p = p[:4] + struct.pack('!I', length) + p[8:]
        return p + pay


for i in range(15):
    bind_layers(UDP, SOMEIP, sport=30490 + i)
    bind_layers(TCP, SOMEIP, sport=30490 + i)


3、network_define.py

class EthParameter:
    # 广播
    sd_network_card = "eth0.62"
    sd_ip = "239.0.0.255"

    # 本机
    server_network_card = "Realtek PCIe GbE Family Controller #2"
    server_ip = "192.168.62.31"

    # 域控
    client_network_card = "eth0.62"
    client_ip = "192.168.62.11"

    sd_port = 30490
    producer_port = 30500
    consumer_prot = 30501

4、packet_define.py

import eth_scapy_sd as sd

from network_define import EthParameter

from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP


class SomeipPacker:
    def __init__(self):
        self.eth_para = EthParameter()
        self.sd_session_id = 1
        self.client_session_id = 1

    def update_sd_session_id(self):
        """更新session_id,从0递增到65535"""
        self.sd_session_id += 1
        if self.sd_session_id >= 65535:
            self.sd_session_id = 0

    def update_client_session_id(self):
        self.client_session_id += 1
        if self.client_session_id >= 65535:
            self.client_session_id = 0


    # ===========================#
    #          序列化             #
    # ===========================#
    def packet_subscribe(self, service_id: list, ttl: int):
        """组事件订阅包"""
        subscribe_packager = sd.SD()
        subscribe_packager.setFlag("REBOOT", 1)
        subscribe_packager.setFlag("UNICAST", 1)
        subscribe_packager.len_entry_array = 16 * len(service_id)
        subscribe_packager.len_option_array = 24
        for i in service_id:
            subscribe_packager.entry_array.append(
                sd.SDEntry_EventGroup(
                    type=sd.SDEntry_EventGroup.TYPE_EVTGRP_SUBSCRIBE,
                    srv_id=i,
                    inst_id=0x1,
                    n_opt_1=2,
                    major_ver=0x1,
                    ttl=ttl,
                    eventgroup_id=0x1,
                )
            )
        subscribe_packager.option_array = [
            # UDP EndPoint
            sd.SDOption_IP4_EndPoint(
                addr=self.eth_para.server_ip,
                l4_proto=0x11,
                port=self.eth_para.consumer_prot,
            ),
            # TCP EndPoint
            sd.SDOption_IP4_EndPoint(
                addr=self.eth_para.server_ip,
                l4_proto=0x06,
                port=self.eth_para.consumer_prot,
            ),
        ]
        print(subscribe_packager.show())
        subscribe_package = (
            Ether()
            / IP(src=self.eth_para.server_ip, dst=self.eth_para.client_ip)
            / UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port)
            / subscribe_packager.get_someip_with_session_id(
                self.client_session_id, True
            )
        )
        self.update_client_session_id()
        return subscribe_package

    def packet_subscribe_ack(self, service_id: list, ttl: int):
        """组ack包"""
        ack_packager = sd.SD()
        ack_packager.setFlag("REBOOT", 1)
        ack_packager.setFlag("UNICAST", 1)
        ack_packager.len_entry_array = 16 * len(service_id)
        ack_packager.len_option_array = 0
        for i in service_id:
            ack_packager.entry_array.append(
                sd.SDEntry_EventGroup(
                    type=sd.SDEntry_EventGroup.TYPE_EVTGRP_SUBSCRIBE_ACK,
                    srv_id=i,
                    inst_id=0x1,
                    major_ver=0x1,
                    ttl=ttl,
                    eventgroup_id=0x1,
                )
            )
        print(ack_packager.show())
        ack_package = (
            Ether()
            / IP(src=self.eth_para.server_ip, dst=self.eth_para.client_ip)
            / UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port)
            / ack_packager.get_someip_with_session_id(self.client_session_id, True)
        )
        self.update_client_session_id()
        return ack_package

    def packet_offer(self, service_id: list, ttl: int):
        """组offer包"""
        offer_packager = sd.SD()
        offer_packager.setFlag("REBOOT", 1)
        offer_packager.setFlag("UNICAST", 1)
        offer_packager.len_entry_array = 16 * len(service_id)
        offer_packager.len_option_array = 12
        for i in service_id:
            offer_packager.entry_array.append(
                sd.SDEntry_Service(
                    type=sd.SDEntry_Service.TYPE_SRV_OFFERSERVICE,
                    srv_id=i,
                    inst_id=0x1,
                    n_opt_1=1,
                    major_ver=0x1,
                    minor_ver=0x01,
                    ttl=ttl,
                )
            )
        offer_packager.option_array = [
            sd.SDOption_IP4_EndPoint(
                addr=self.eth_para.server_ip,
                l4_proto=0x11,
                port=self.eth_para.producer_port,
            )
        ]
        print(offer_packager.show())
        offer_package = (
            Ether()
            / IP(src=self.eth_para.server_ip, dst=self.eth_para.sd_ip)
            / UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port)
            / offer_packager.get_someip_with_session_id(self.sd_session_id, True)
        )
        self.update_sd_session_id()
        return offer_package


5、unpack_define.py

from scapy.packet import Raw
from typing import Optional
from dataclasses import dataclass

@dataclass
class SomeIPHeaderParams:
    service_id: int
    event_id: int      # 或 event_id,取决于消息类型
    client_id: int
    session_id: int
    msg_type: int
    return_code: int
    length: int
    protocol_version: int
    interface_version: int


class SomeipUnpacker:
    @staticmethod
    def get_someip_header_params(receive_packet) -> Optional[SomeIPHeaderParams]:
        """获取someip header"""
        try:
            if receive_packet.haslayer("SOME/IP"):
                someip_layer = receive_packet["SOME/IP"]
                # 获取someip header值
                service_id = someip_layer.msg_id.srv_id
                sub_id = someip_layer.msg_id.sub_id
                event_id = sub_id << 15 | someip_layer.msg_id.event_id
                # 提取请求 ID(Client ID 和 Session ID)
                req_id = someip_layer.req_id
                client_id = req_id.client_id
                session_id = req_id.session_id

                # 构造头部对象
                return SomeIPHeaderParams(
                    service_id=service_id,
                    event_id=event_id,
                    client_id=client_id,
                    session_id=session_id,
                    msg_type=someip_layer.msg_type,
                    return_code=someip_layer.retcode,
                    length=someip_layer.len,
                    protocol_version=someip_layer.proto_ver,
                    interface_version=someip_layer.iface_ver,
                )
        except (AttributeError, ValueError) as e:
            print(f"解析失败: {e}")
            return None


    @staticmethod
    def get_someip_payload(receive_packet):
        """获取someip payload"""
        try:
            someip_payload = receive_packet[Raw].load
            # print(f"someip_payload: {someip_payload}")
            return someip_payload
        except AttributeError:
            return None

6、someip_controller.py

import threading
import socket
import time
from module.packet_define import *
from module.unpack_define import *

from scapy.sendrecv import sendp, sniff

class SomeipController(threading.Thread):
    def __init__(self, someip_packer: SomeipPacker, someip_unpacker: SomeipUnpacker):
        super().__init__()
        self.someip_packer = someip_packer
        self.someip_unpacker = someip_unpacker
        self._stop_event = threading.Event()
        self.running = True

    def run(self):
        # 创建并发送offer以及订阅ack报文(这里忽略订阅,直接回复订阅ack)
        service_id_list = [0xA994, 0xA995]
        _offer = self.someip_packer.packet_offer(
            service_id=service_id_list,
            ttl=3
        )
        _offer_sender = SomeipSendLoop(
            network_card=EthParameter.sd_network_card,
            someip_packer=_offer,
            cycle_time_ms=2000,
        )

        # 回复订阅ACK
        _subscribe_ack = self.someip_packer.packet_subscribe_ack(
            service_id=service_id_list,
            ttl=3
        )
        _subscribe_ack_sender = SomeipSendLoop(
            network_card=EthParameter.sd_network_card,
            someip_packer=_subscribe_ack,
            cycle_time_ms=2000,
        )

        # 创建并发送订阅报文(订阅0xAB03, 0xAB04)
        _subscribe = self.someip_packer.packet_subscribe(
            service_id=[0xAB03,0xAB04], ttl=0x03
        )
        _subscribe_sender = SomeipSendLoop(
            network_card=EthParameter.server_network_card,
            someip_packer=_subscribe,
            cycle_time_ms=2000,
        )

        # 启动接收模块
        someip_receiver = SomeIpReceiver(EthParameter.server_network_card)

        while self._stop_event:
            time.sleep(0.01)

    def stop(self):
        self.running = False  # 设置标志位为 False 来停止
        self._stop_event.set()
        self.join()


class SomeipSendLoop(threading.Thread):
    def __init__(self, network_card, someip_packer, cycle_time_ms):
        super().__init__()
        self.network_card = network_card
        self.someip_packer = someip_packer
        self.cycle_time_ms = cycle_time_ms
        self._stop_event = threading.Event()
        self.start()

    def run(self):
        while self._stop_event:
            if self.someip_packer:
                sendp(self.someip_packer, iface=self.network_card, verbose=0)
                time.sleep(self.cycle_time_ms / 1000)

    def stop(self):
        self._stop_event.set()
        self.join()


class SomeIpReceiver(threading.Thread):
    def __init__(
        self,
        eth_desc: str,
        someip_unpacker
    ):
        super().__init__()
        self._eth_desc = eth_desc
        self._unpacker = someip_unpacker
        self._terminated = False
        self._stop_event =threading.Event()
        self.start()

    def packet_callback(self, packet):
        if packet:
            header_param = self._unpacker.get_someip_header_params(
                receive_packet=packet
            )
            print(f"msg_type: {header_param.msg_type}")

    def run(self):
        sniff(
            iface=self._eth_desc,
            prn=self.packet_callback,
            filter="udp",
            stop_filter=lambda x: self._terminated,
        )
        while self._stop_event:
            # 保持sniff嗅探不中断
            time.sleep(1)

    def stop(self):
        self._terminated = True
        self._stop_event.set()
        self.join()

# 如果协议有TCP
class SomeipTcpSocket(threading.Thread):
    def __init__(self):
        super().__init__()
        self.tcp_sock: socket = None
        self._stop_event = threading.Event()
        self.start()

    def create_tcp_socket(self):
        self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.tcp_sock.bind((EthParameter.icc_android_ip, EthParameter.consumer_prot))
        self.tcp_sock.connect((EthParameter.adcc_ip, EthParameter.producer_port))

    def run(self):
        self.create_tcp_socket()
        while self._stop_event:
            # 保持TCP通信
            time.sleep(1)

    def stop(self):
        self._stop_event.set()
        self.join()


if __name__ == "__main__":
    s_socket = SomeipTcpSocket()
    s_packer = SomeipPacker()
    s_unpacker = SomeipUnpacker()
    sim_server = SomeipController(
        s_packer,
        s_unpacker
    )
    sim_server.start()



http://www.kler.cn/a/586713.html

相关文章:

  • 【Unity网络同步框架 - Nakama研究(二)】
  • Spark 中创建 DataFrame 的2种方式对比
  • 【最后203篇系列】015 几种消息队列的思考
  • docker后台运行,便于后期用命令行进入它的终端
  • 【vscode-03】AUTOSAR CP 插件配置
  • DataWhale 速通AI编程开发:(进阶篇)第3章 提示词(Prompts)配置项
  • AI与人的智能,改变一生的思维模型【7】易得性偏差
  • 空调acwing二进制差分
  • C++移动语义与右值引用:从理论到实践的深度解析引言
  • python:数据类构建器
  • 《DeepSeek深度使用教程:开启智能交互新体验》Deepseek深度使用教程
  • 【大模型基础_毛玉仁】2.4 基于 Encoder-Decoder 架构的大语言模型
  • AtCoder Beginner Contest 003(A - 社の給料、B -トランプ、C -プログラミング講座、D - 社の冬 )题目讲解
  • 【PHP】新版本特性记录(持续更新)
  • java 的标记接口RandomAccess使用方法
  • vulnhub靶场之stapler靶机
  • MIDI,AI 3D场景生成技术
  • Audacity 技术浅析(一)
  • [CISSP] [3] 人员安全与社会工程
  • 原生微信小程序实现导航漫游(Tour)