CTP_将C++封装为Python可调用接口
目录
写在前面:
前置准备:
step 1 与上期所原始代码对比分析源码
td源码
1 配置属性-》常规-》配置类型 要为 “动态库(.dll)”
2 VC++目录 -》包含目录
3 VC++目录 -》 库目录
4 链接器-》常规-》附加库目录
5 链接器-》输入-》附加依赖项
vnctp.h 的功能
vnctptd.h 分析
vnctptd.cpp 分析
md源码
step 2 在Visual Studio 中生成项目
step 3 通过setup.py生成pyd文件
在Python中使用
CtpMdApi类
CtpTdApi类
执行
写在前面:
1 我最开始是用swig直接对对上期技术的C++包进行处理,处理成python可调用的版本后,行情和交易可以正常连接和登录,但当我使用交易服务器查询合约,等待回调函数时就崩溃提示异常退出 -1073740791 (0xC0000409),而且没有任何错误信息。经过反复重编译,.i文件的修改都无法解决此问题,只能放弃使用swig的方法,转而借鉴VeihgNa Studio的接口。
2 VeihgNa Studio的CTP接口是使用的pybind11和setuptools封装成python可用的包
3 本文直接通过 VeihgNa Studio 的源码记录转换过程
前置准备:
1 安装Visual Studio 2022,安装前选择组件要包含C++动态链接库的模板
2 安装python3,本文安装的版本为python3.7.1
step 1 与上期所原始代码对比分析源码
下载 VeihgNa Studio 的CTP源码 GitHub - vnpy/vnpy_ctp: VeighNa框架的CTP交易接口
下载后解压,解压后文件目录如下:
在 vnpy_ctp/api/vnctp/ 目录下有C++项目
右键vnctp.sln , 在弹出菜单中选择打开方式为 Visual Studio 2022
直接按“确定”
td源码
vnctptd项目工程文件结构
上期提供的ThostFtdcMdApi.h、ThostFtdcTraderApi.h 、ThostFtdcUserApiDataType.h、ThostFtdcUserApiStruct.h 在【头文件】中
vnctp.h和vnctptd.h 是 VeihgNa Studio 作者写的,以下分析这两个头文件
双击vnctp.h, 会发现 const dict &d 提示错误
这个问题是由于我们只是拷贝了项目代码,项目代码的依赖项没有变更,以下处理这个问题:
右边选中 vnctptd 项目,右键-》属性
1 配置属性-》常规-》配置类型 要为 “动态库(.dll)”
2 VC++目录 -》包含目录
修改前
1) D:\veighna_studio\include 这个修改为python安装目录下的include目录,本文python3.7.1对应的目录为 D:\soft\python371\include
2)其他三项不用修改,我们下载的源码包里已经包含
修改后
3 VC++目录 -》 库目录
修改前
1) D:\veighna_studio\libs 修改为python安装目录下的libs文件夹目录
2)..\libs 不用变,下载的源码里有包含这个文件夹
修改后
4 链接器-》常规-》附加库目录
修改前
1) D:\veighna_studio\libs 修改为python安装目录下的libs文件夹路径
2) ..\libs 不用修改
修改后
5 链接器-》输入-》附加依赖项
这个一般不用修改,但最好检查下
6 C/C++ -》预编译投 -》预编译头 -》不使用预编译头
修改完成后,点击“应用”,“确定”
代码就没有提示错误了
现在继续分析源码,打开vnctp.h
vnctp.h 的功能
1)处理字符编码,将非UTF-8转UTF-8
2) 处理参数类型
3)定义一个任务队列,任务队列主要是为了避免在与交易所交互过程中我方耽搁太长时间导致我方程序崩溃
vnctptd.h 分析
1)分为两大部分,一部分是常量,将每个回调方法名分别定义一个常量值;一部分是继承 CThostFtdcTraderSpi的类TdApi
2) TdApi 实现Spi中所有回调函数方法(以On开头)、每个回调函数对应的任务方法(以process开头)、定义在python中调用的回调函数方法(以on开头)、实现Api中所有请求函数方法
vnctptd.cpp 分析
上面vnctptd.h分析中,TdApi分为四部分,在.cpp中四部分分别取其中一个函数为例解说
1)Spi中的回调函数,以OnRspUserLogin为例
创建一个任务Task
将方法名对应的常数 ONRSPUSERLOGIN复制给任务名称 task_name
检查返回结果,如果登录成功,将返回的数据放入 task_data
如果登录失败,将失败信息放入 task_error
将任务推进队列
2) 每个回调函数对应的任务方法,以 processRspUserLogin 为例
将返回的数据或错误信息转换成python能识别的字典dict
调用 onRspUserLogin
3) 定义在python中调用的回调函数方法
通过pybind11将onRspUserLogin封装为python可调用的方法
4)实现Api中所有请求函数方法
通过pybind11将每个Api函数封装为python可调用的方法
md源码
先处理错误提示,与td源码中的处理方法一致,这里不再赘述。
vnctpmd.h 和 vnctpmd.cpp 的逻辑和td源码的逻辑相同,这里也同样不再赘述
step 2 在Visual Studio 中生成项目
在Visual Studio生成项目的目的主要是看是否会有报错,有报错的话在VS中分析起来比较方便,如果生成项目没问题,就可以进行step 3
以 vnctptd 项目为例讲述
选中 vnctptd 项目,生成-》生成vnctptd
生成没有问题。
PS: 在最开始的时候,我生成是有报错的,这里罗列下,以便遇到同样问题的同学可以参考
1)pybind11 报错声明重复,通过更换项目里的 pybind11相关文件修正。https://github.com/pybind/pybind11
下载解压后在 include 文件夹下有一个pybind11文件夹,直接将这个文件夹替换项目中的 pybind11的文件夹
2)error C2039: "ReqOptionSelfCloseInsert": 不是 "CThostFtdcTraderApi" 的成员
检查了各种可能的情况,看到说可能和中文注释有关,我就在 /// 与 注释内容 之间加了一个空格,再生成,就OK了
step 3 通过setup.py生成pyd文件
在下载的源码跟目录下,有一个setup.py文件
打开电脑的 cmd , 将目录转到 setup.py 所在目录下
在该目录下输入命令 python setup.py build
注意:本人电脑中的python就是执行的python3,如果你的电脑同时有python2和python3又没做设置,那你应该是python3 setup.py build
这个命令执行过程大概一两分钟,过程会输出很多警告,只要不是error,其他都可以不管
pyd创建完毕了
在项目根目录会有一个build文件夹,文件夹里有一个lib.win-amd64-3.7的文件夹,如果使用的电脑环境和python版本不一样,这个文件夹名字会对应你电脑的环境。在lib.win-amd64-3.7里的vnpy_ctp就是我们要用的python包,将整个vnpy_ctp拷贝到要使用的Python项目就可以使用CTP接口
在Python中使用
直接使用里面的 TdApi和MdApi
1) 将gateway删除
2)将__init__.py清空
创建一个python项目ctp
将vnpy_ctp拷贝到项目ctp目录下
在ctp目录下创建一个test_api.py文件
在test_api.py文件中创建 CtpMdApi类和CtpTdApi(TdApi)类,这两个类分别继承MdApi和TdApi,这两个类基本上是借鉴了VeihgNa Studio里的代码
注意:这里导入TdApi和TdApi 提示错误,不用理会,程序能正常运行
CtpMdApi类
class CtpMdApi(MdApi):
def __init__(self)->None:
super().__init__()
self.reqid: int = 0
self.connect_status:bool = False
self.login_status:bool = False
self.subscribed: set = set()
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.current_date: str = date_tools.res_today_str()
pass
def connect(self, address: str, userid: str, password: str, brokerid: str)->None:
self.userid = userid
self.password = password
self.brokerid = brokerid
if not self.connect_status:
path: Path = get_folder_path(self.gateway_name.lower())
self.createFtdcMdApi((str(path) + "\\Md").encode("GBK"))
self.registerFront(address)
self.init()
self.connect_status = True
pass
def login(self) -> None:
ctp_req:dict = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid
}
self.reqid += 1
self.reqUserLogin(ctp_req,self.reqid)
pass
def subscribe(self,req:dict):
if self.login_status:
self.subscribeMarketData(req['symbol'])
self.subscribed.add(req['symbol'])
def close(self)->None:
if self.connect_status:
self.exit()
def update_date(self)->None:
self.current_date = date_tools.res_today_str()
def onFrontConnected(self)->None:
self.login()
pass
def onFrontDisconnected(self,reason:int)->None:
self.login_status = False
def onRspUserLogin(self,data:dict,error:dict,reqid:int,last:bool)->None:
if not error['ErrorID']:
self.login_status = True
for symbol in self.subscribed:
self.subscribeMarketData(symbol)
else:
print(f"行情服务器登录失败。{error['ErrorID']}.{error['ErrorMsg']}")
pass
def onRspError(self, error: dict, reqid: int, last: bool)->None:
print('行情接口报错。',error['ErrorID'],error['ErrorMsg'])
pass
def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool)->None:
if not error or not error['ErrorID']:
return
print('行情订阅失败。',error['ErrorID'],error['ErrorMsg'])
def onRtnDepthMarketData(self,data:dict)->None:
if not data['UpdateTime']:
return
print('tick返回',data['InstrumentID'],data['LastPrice'])
pass
CtpTdApi类
class CtpTdApi(TdApi):
def __init__(self)->None:
super().__init__()
self.reqid: int = 0
self.order_ref: int = 0
self.connect_status: bool = False
self.login_status: bool = False
self.auth_status: bool = False
self.login_failed: bool = False
self.auth_failed: bool = False
self.contract_inited: bool = False
self.userid: str = ""
self.password: str = ""
self.brokerid: str = ""
self.auth_code: str = ""
self.appid: str = ""
self.frontid: int = 0
self.sessionid: int = 0
pass
def connect(self,address:str,userid:str,password:str,brokerid:str,auth_code:str,appid:str)->None:
self.userid = userid
self.password = password
self.brokerid = brokerid
self.auth_code = auth_code
self.appid = appid
if not self.connect_status:
path: Path = get_folder_path(self.gateway_name.lower())
self.createFtdcTdApi((str(path) + "\\Td").encode("GBK"))
self.subscribePrivateTopic(0)
self.subscribePublicTopic(0)
self.registerFront(address)
self.init()
self.connect_status = True
else:
self.authenticate()
pass
def authenticate(self)->None:
if self.auth_failed:
return
ctp_req: dict = {
"UserID": self.userid,
"BrokerID": self.brokerid,
"AuthCode": self.auth_code,
"AppID": self.appid
}
self.reqid += 1
self.reqAuthenticate(ctp_req, self.reqid)
pass
def login(self)->None:
if self.login_failed:
return
ctp_req: dict = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid,
"AppID": self.appid
}
self.reqid += 1
self.reqUserLogin(ctp_req, self.reqid)
pass
def close(self)->None:
if self.connect_status:
self.exit()
def onFrontConnected(self)->None:
print('onFrontConnected')
if self.auth_code:
self.authenticate()
else:
self.login()
def onFrontDisconnected(self,reason:int)->None:
self.login_status = False
print('onFrontDisconnected',reason)
def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool)->None:
print('onRspAuthenticate')
if not error['ErrorID']:
self.auth_status = True
self.login()
else:
self.auth_failed = True
print('交易服务器验证失败。',error['ErrorID'],error['ErrorMsg'])
pass
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
print('onRspUserLogin')
if not error["ErrorID"]:
self.frontid = data["FrontID"]
self.sessionid = data["SessionID"]
self.login_status = True
# 自动确认结算单
ctp_req: dict = {
"BrokerID": self.brokerid,
"InvestorID": self.userid
}
self.reqid += 1
self.reqSettlementInfoConfirm(ctp_req, self.reqid)
else:
self.login_failed = True
print("交易服务器登录失败", error['ErrorID'],error['ErrorMsg'])
pass
def onRspSettlementInfoConfirm(self, data: dict, error: dict, reqid: int, last: bool) -> None:
print('onRspSettlementInfoConfirm')
while True:
self.reqid += 1
n: int = self.reqQryInstrument({}, self.reqid)
if not n:
break
else:
time.sleep(1)
pass
def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
print(data['ProductClass'],data['InstrumentID'],data['ProductID'],reqid,last)
if last:
self.contract_inited = True
print('合约信息查询完毕')
pass
pass
执行
if __name__ == '__main__':
investorid = ""
brokerid="9999"
password= ""
appid= "simnow_client_test"
auth_code= "0000000000000000"
md_ip= "180.168.146.187:10211"
trader_ip= "180.168.146.187:10201"
temp_api = CtpTdApi()
address = f"tcp://{trader_ip}"
temp_api.connect(address,investorid,password,brokerid,auth_code,appid)
import keyboard
keyboard.wait('esc')
sys.exit()
pass
代码中执行的是请求所有合约,打印到控制台
结果
可以请求到合约,接口可用。
PS:我在刚开是执行时,address直接写入的ip,提示的 RuntimeError:Invalid location in line 45 of file ..\..\source\network\ServicName.cpp 错误,后来在ip前加上tcp://就可以正常运行