当前位置: 首页 > article >正文

CTP_将C++封装为Python可调用接口

目录

写在前面:

前置准备:

step 1 与上期所原始代码对比分析源码

 td源码

1 配置属性-》常规-》配置类型 要为 “动态库(.dll)”

 2 VC++目录 -》包含目录

 3 VC++目录 -》 库目录

 4 链接器-》常规-》附加库目录

 5 链接器-》输入-》附加依赖项

vnctp.h 的功能

vnctptd.h 分析

vnctptd.cpp 分析

md源码

step 2 在Visual Studio 中生成项目

step 3 通过setup.py生成pyd文件

在Python中使用

 CtpMdApi类

 CtpTdApi类

执行


写在前面:

1 我最开始是用swig直接对对上期技术的C++包进行处理,处理成python可调用的版本后,行情和交易可以正常连接和登录,但当我使用交易服务器查询合约,等待回调函数时就崩溃提示异常退出 -1073740791 (0xC0000409),而且没有任何错误信息。经过反复重编译,.i文件的修改都无法解决此问题,只能放弃使用swig的方法,转而借鉴VeihgNa Studio的接口。

2 VeihgNa Studio的CTP接口是使用的pybind11和setuptools封装成python可用的包

3 本文直接通过 VeihgNa Studio 的源码记录转换过程

前置准备:

1 安装Visual Studio 2022,安装前选择组件要包含C++动态链接库的模板

2 安装python3,本文安装的版本为python3.7.1

step 1 与上期所原始代码对比分析源码

下载 VeihgNa Studio 的CTP源码 GitHub - vnpy/vnpy_ctp: VeighNa框架的CTP交易接口

下载后解压,解压后文件目录如下:

 在 vnpy_ctp/api/vnctp/ 目录下有C++项目

 右键vnctp.sln , 在弹出菜单中选择打开方式为 Visual Studio 2022

直接按“确定” 

 td源码

vnctptd项目工程文件结构

上期提供的ThostFtdcMdApi.h、ThostFtdcTraderApi.h 、ThostFtdcUserApiDataType.h、ThostFtdcUserApiStruct.h 在【头文件】中

vnctp.h和vnctptd.h 是 VeihgNa Studio 作者写的,以下分析这两个头文件

双击vnctp.h, 会发现 const dict &d 提示错误

这个问题是由于我们只是拷贝了项目代码,项目代码的依赖项没有变更,以下处理这个问题:

右边选中 vnctptd 项目,右键-》属性

1 配置属性-》常规-》配置类型 要为 “动态库(.dll)”

 2 VC++目录 -》包含目录

修改前

 1) D:\veighna_studio\include   这个修改为python安装目录下的include目录,本文python3.7.1对应的目录为 D:\soft\python371\include

2)其他三项不用修改,我们下载的源码包里已经包含

 修改后

 3 VC++目录 -》 库目录

修改前

1)  D:\veighna_studio\libs  修改为python安装目录下的libs文件夹目录

2)..\libs 不用变,下载的源码里有包含这个文件夹

修改后

 4 链接器-》常规-》附加库目录

修改前

 1) D:\veighna_studio\libs  修改为python安装目录下的libs文件夹路径

2) ..\libs 不用修改

修改后

 5 链接器-》输入-》附加依赖项

这个一般不用修改,但最好检查下

6 C/C++ -》预编译投 -》预编译头 -》不使用预编译头

 修改完成后,点击“应用”,“确定”

 代码就没有提示错误了

现在继续分析源码,打开vnctp.h

vnctp.h 的功能

1)处理字符编码,将非UTF-8转UTF-8

2)  处理参数类型

3)定义一个任务队列,任务队列主要是为了避免在与交易所交互过程中我方耽搁太长时间导致我方程序崩溃

vnctptd.h 分析

1)分为两大部分,一部分是常量,将每个回调方法名分别定义一个常量值;一部分是继承 CThostFtdcTraderSpi的类TdApi

2) TdApi 实现Spi中所有回调函数方法(以On开头)、每个回调函数对应的任务方法(以process开头)、定义在python中调用的回调函数方法(以on开头)、实现Api中所有请求函数方法

vnctptd.cpp 分析

上面vnctptd.h分析中,TdApi分为四部分,在.cpp中四部分分别取其中一个函数为例解说

1)Spi中的回调函数,以OnRspUserLogin为例

创建一个任务Task

将方法名对应的常数 ONRSPUSERLOGIN复制给任务名称 task_name

检查返回结果,如果登录成功,将返回的数据放入 task_data

如果登录失败,将失败信息放入 task_error

将任务推进队列

2) 每个回调函数对应的任务方法,以 processRspUserLogin 为例

将返回的数据或错误信息转换成python能识别的字典dict

调用 onRspUserLogin 

3) 定义在python中调用的回调函数方法

 通过pybind11将onRspUserLogin封装为python可调用的方法

4)实现Api中所有请求函数方法

通过pybind11将每个Api函数封装为python可调用的方法

md源码

先处理错误提示,与td源码中的处理方法一致,这里不再赘述。

vnctpmd.h 和 vnctpmd.cpp 的逻辑和td源码的逻辑相同,这里也同样不再赘述

step 2 在Visual Studio 中生成项目

在Visual Studio生成项目的目的主要是看是否会有报错,有报错的话在VS中分析起来比较方便,如果生成项目没问题,就可以进行step 3

以 vnctptd 项目为例讲述

选中 vnctptd 项目,生成-》生成vnctptd

 生成没有问题。

PS: 在最开始的时候,我生成是有报错的,这里罗列下,以便遇到同样问题的同学可以参考

1)pybind11 报错声明重复,通过更换项目里的 pybind11相关文件修正。https://github.com/pybind/pybind11

 下载解压后在 include 文件夹下有一个pybind11文件夹,直接将这个文件夹替换项目中的 pybind11的文件夹

2)error C2039: "ReqOptionSelfCloseInsert": 不是 "CThostFtdcTraderApi" 的成员

检查了各种可能的情况,看到说可能和中文注释有关,我就在 /// 与 注释内容 之间加了一个空格,再生成,就OK了

step 3 通过setup.py生成pyd文件

在下载的源码跟目录下,有一个setup.py文件

打开电脑的 cmd , 将目录转到 setup.py 所在目录下

在该目录下输入命令  python setup.py build

注意:本人电脑中的python就是执行的python3,如果你的电脑同时有python2和python3又没做设置,那你应该是python3 setup.py build

这个命令执行过程大概一两分钟,过程会输出很多警告,只要不是error,其他都可以不管

 pyd创建完毕了

在项目根目录会有一个build文件夹,文件夹里有一个lib.win-amd64-3.7的文件夹,如果使用的电脑环境和python版本不一样,这个文件夹名字会对应你电脑的环境。在lib.win-amd64-3.7里的vnpy_ctp就是我们要用的python包,将整个vnpy_ctp拷贝到要使用的Python项目就可以使用CTP接口

在Python中使用

直接使用里面的 TdApi和MdApi

1) 将gateway删除

2)将__init__.py清空

创建一个python项目ctp

将vnpy_ctp拷贝到项目ctp目录下

在ctp目录下创建一个test_api.py文件

在test_api.py文件中创建 CtpMdApi类和CtpTdApi(TdApi)类,这两个类分别继承MdApi和TdApi,这两个类基本上是借鉴了VeihgNa Studio里的代码

注意:这里导入TdApi和TdApi 提示错误,不用理会,程序能正常运行

 CtpMdApi类

class CtpMdApi(MdApi):
    def __init__(self)->None:
        super().__init__()

        self.reqid: int = 0
        self.connect_status:bool = False
        self.login_status:bool = False
        self.subscribed: set = set()

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""

        self.current_date: str = date_tools.res_today_str()
        pass
    def connect(self, address: str, userid: str, password: str, brokerid: str)->None:
        self.userid = userid
        self.password = password
        self.brokerid = brokerid

        if not self.connect_status:
            path: Path = get_folder_path(self.gateway_name.lower())
            self.createFtdcMdApi((str(path) + "\\Md").encode("GBK"))
            self.registerFront(address)
            self.init()

            self.connect_status = True
            pass

    def login(self) -> None:
        ctp_req:dict = {
            "UserID": self.userid,
            "Password": self.password,
            "BrokerID": self.brokerid
        }

        self.reqid += 1
        self.reqUserLogin(ctp_req,self.reqid)
        pass

    def subscribe(self,req:dict):
        if self.login_status:
            self.subscribeMarketData(req['symbol'])
        self.subscribed.add(req['symbol'])

    def close(self)->None:
        if self.connect_status:
            self.exit()

    def update_date(self)->None:
        self.current_date = date_tools.res_today_str()

    def onFrontConnected(self)->None:
        self.login()
        pass
    def onFrontDisconnected(self,reason:int)->None:
        self.login_status = False

    def onRspUserLogin(self,data:dict,error:dict,reqid:int,last:bool)->None:
        if not error['ErrorID']:
            self.login_status = True
            for symbol in self.subscribed:
                self.subscribeMarketData(symbol)
        else:
            print(f"行情服务器登录失败。{error['ErrorID']}.{error['ErrorMsg']}")
        pass
    def onRspError(self, error: dict, reqid: int, last: bool)->None:
        print('行情接口报错。',error['ErrorID'],error['ErrorMsg'])
        pass
    def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool)->None:
        if not error or not error['ErrorID']:
            return
        print('行情订阅失败。',error['ErrorID'],error['ErrorMsg'])
    def onRtnDepthMarketData(self,data:dict)->None:
        if not data['UpdateTime']:
            return
        print('tick返回',data['InstrumentID'],data['LastPrice'])
        pass

 CtpTdApi类

class CtpTdApi(TdApi):
    def __init__(self)->None:
        super().__init__()

        self.reqid: int = 0
        self.order_ref: int = 0

        self.connect_status: bool = False
        self.login_status: bool = False
        self.auth_status: bool = False
        self.login_failed: bool = False
        self.auth_failed: bool = False
        self.contract_inited: bool = False

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""
        self.auth_code: str = ""
        self.appid: str = ""

        self.frontid: int = 0
        self.sessionid: int = 0
        pass
    def connect(self,address:str,userid:str,password:str,brokerid:str,auth_code:str,appid:str)->None:
        self.userid = userid
        self.password = password
        self.brokerid = brokerid
        self.auth_code = auth_code
        self.appid = appid

        if not self.connect_status:
            path: Path = get_folder_path(self.gateway_name.lower())
            self.createFtdcTdApi((str(path) + "\\Td").encode("GBK"))
            self.subscribePrivateTopic(0)
            self.subscribePublicTopic(0)
            self.registerFront(address)
            self.init()

            self.connect_status = True
        else:
            self.authenticate()
        pass
    def authenticate(self)->None:
        if self.auth_failed:
            return
        ctp_req: dict = {
            "UserID": self.userid,
            "BrokerID": self.brokerid,
            "AuthCode": self.auth_code,
            "AppID": self.appid
        }

        self.reqid += 1
        self.reqAuthenticate(ctp_req, self.reqid)
        pass
    def login(self)->None:
        if self.login_failed:
            return
        ctp_req: dict = {
            "UserID": self.userid,
            "Password": self.password,
            "BrokerID": self.brokerid,
            "AppID": self.appid
        }

        self.reqid += 1
        self.reqUserLogin(ctp_req, self.reqid)
        pass
    def close(self)->None:
        if self.connect_status:
            self.exit()

    def onFrontConnected(self)->None:
        print('onFrontConnected')
        if self.auth_code:
            self.authenticate()
        else:
            self.login()
    def onFrontDisconnected(self,reason:int)->None:
        self.login_status = False
        print('onFrontDisconnected',reason)
    def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool)->None:
        print('onRspAuthenticate')
        if not error['ErrorID']:
            self.auth_status = True
            self.login()
        else:
            self.auth_failed = True
            print('交易服务器验证失败。',error['ErrorID'],error['ErrorMsg'])
        pass

    def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        print('onRspUserLogin')
        if not error["ErrorID"]:
            self.frontid = data["FrontID"]
            self.sessionid = data["SessionID"]
            self.login_status = True

            # 自动确认结算单
            ctp_req: dict = {
                "BrokerID": self.brokerid,
                "InvestorID": self.userid
            }
            self.reqid += 1
            self.reqSettlementInfoConfirm(ctp_req, self.reqid)
        else:
            self.login_failed = True
            print("交易服务器登录失败", error['ErrorID'],error['ErrorMsg'])
        pass

    def onRspSettlementInfoConfirm(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        print('onRspSettlementInfoConfirm')
        while True:
            self.reqid += 1
            n: int = self.reqQryInstrument({}, self.reqid)
            if not n:
                break
            else:
                time.sleep(1)
        pass

    def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        print(data['ProductClass'],data['InstrumentID'],data['ProductID'],reqid,last)
        if last:
            self.contract_inited = True
            print('合约信息查询完毕')
        pass
    pass

执行

if __name__ == '__main__':
    investorid = ""
    brokerid="9999"
    password= ""
    appid= "simnow_client_test"
    auth_code= "0000000000000000"
    md_ip= "180.168.146.187:10211"
    trader_ip= "180.168.146.187:10201"

    temp_api = CtpTdApi()
    address = f"tcp://{trader_ip}"
    temp_api.connect(address,investorid,password,brokerid,auth_code,appid)

    import keyboard
    keyboard.wait('esc')
    sys.exit()
    pass

代码中执行的是请求所有合约,打印到控制台

结果

 可以请求到合约,接口可用。

PS:我在刚开是执行时,address直接写入的ip,提示的 RuntimeError:Invalid location in line 45 of file ..\..\source\network\ServicName.cpp 错误,后来在ip前加上tcp://就可以正常运行


http://www.kler.cn/a/8223.html

相关文章:

  • AI大模型:重塑软件开发流程的优势、挑战及应对策略
  • ExecStart=/usr/bin/mongod --config /etc/mongod.conf (code=exited, status=2)
  • 深度学习-图像评分实验(TensorFlow框架运用、读取处理图片、模型建构)
  • day60 图论章节刷题Part10(Floyd 算法、A * 算法)
  • 2024最新版JavaScript逆向爬虫教程-------基础篇之Chrome开发者工具学习
  • linux,自定义Yum仓库、网络Yum仓库、DNS服务基础
  • Excel快捷键
  • CTF杂项提纲
  • leetcode每日一题:数组篇(1/2)
  • 乘法逆元讲解
  • 1004[递归]母牛的故事
  • cmake 常用方法自我总结
  • 通过阿里云函数计算解决ChatGPT API的调用问题
  • 算法训练第四十九天 | 121.买卖股票的最佳时机、122.买卖股票的最佳时机II
  • python教程requests详解
  • entos7系统部署网站项目教程【超详细教程】
  • 实践分享:如何在自己的App 中引入AI 画图
  • Web前端如何防止被恶意调式?
  • JS 数组排序方法 - sortFun
  • Kotlin 面向对象(二)
  • Redis —缓存常见异常
  • 父子组件传值问题
  • Ludwig Otto Hölder
  • php企业公司员工考勤加班系统
  • 面试被问到:测试计划和测试方案有什么区别?
  • 派盘为您的个人数据安家