当前位置: 首页 > article >正文

基于 MDL 行情插件的中金所 L1 数据处理最佳实践

本文介绍了如何通过 DolphinDB 的 MDL 插件订阅并处理中金所 Level 1 实时数据。首先,文章简要介绍了 MDL 插件的功能和作用。它是基于 MDL 官方提供的行情数据服务 C++ SDK(即 TCP 版本 MDL )实现,提供了实时数据获取和处理的能力。接着文章详细阐述了数据的存储方案,以及实时数据与历史数据字段对齐的方法。此外,本文还提供了实时合成K线的模块,并讨论了数据处理中的关键细节,例如主力合约的选择和成交量加权平均价格(VWAP)的计算方法。值得注意的是,本文的方法不仅适用于中金所 L1 数据,还可以扩展到其他期货交易所的数据处理,因此具有一定的通用性。

本文中的实时数据处理功能依托于DolphinDB的流数据功能。在继续阅读本文之前,建议读者先了解DolphinDB的流数据订阅及流计算引擎的相关概念,以便更好地理解本文所述的实时行情处理流程及技术实现

本教程的代码基于 DolphinDB 2.00.12版本开发,建议用户使用2.00.12及3.00.1版本以上运行。具体 MDL 插件的接口说明,请参考 DolphinDB MDL 插件使用说明。

1. DolphinDB MDL 插件使用流程

MDL 是通联数据提供的高频行情数据服务, DolphinDB 提供了能够从 MDL 服务器获取高频行情数据的 DolphinDB MDL 插件,用户可以通过 DolphinScript 接入数据,实现高效的数据订阅与处理。

以下是接入 MDL 数据的基本流程,详情请参见《 MDL 行情插件最佳实践指南》:

1. 插件安装及加载安装 MDL 插件并加载到 DolphinDB 中。确保您的 DolphinDB 环境满足插件的依赖要求。通常需要下载插件包并在 DolphinDB 中加载,具体步骤请参考官方文档。

installPlugin("MDL") //安装 MDL 插件
loadPlugin("MDL") //加载 MDL 插件

2. 获取数据结构

在接入数据之前,需要通过 MDL 插件获取目标行情数据的结构。用户输入目标行情的数据服务 ID 和消息 ID 号,具体行情号请参考由通联数据提供的《通联数据 MDL 消息参考》。

cffexLevel1Schema = MDL::getSchema(" MDL SID_ MDL _CFFEX", 1) //中金所level1数据结构

该接口返回一张表,含 name、type 两列,共41个字段。

注:有关 MDL 插件对数据品类的支持情况,请联系 DolphinDB 小助手,微信号:dolphndb1。

3. 创建持久化流表

根据获得的数据结构,创建持久化流表以便存储接收到的行情数据。可以配置表的内存大小和持久化时间以控制数据大小。

enableTableShareAndPersistence(
        streamTable(1:0, cffexLevel1Schema["name"], cffexLevel1Schema["type"]),
        "streamCffexLevel1",
        cacheSize = 1000000, // 保留在系统的内存中的表的条数 1000000条
        retentionMinutes=1440,  // 持久化保留时间 1天
        preCache=1000 // 重建时预加载1000条记录
    )

4. 建立 MDL 连接句柄

通过指定 MDL 服务器的地址、端口、认证信息等,建立 MDL 连接句柄。确保连接的稳定性,并进行必要的配置,如获取接收时间戳和延迟时间设置。

handle =MDL::createHandle("Handle_CFFE_L1_Future", host, port, username)

5. 订阅行情数据

根据上述步骤建立的 MDL 连接句柄和输出的流表,用户可以指定 MDL 的数据服务 ID、数据服务版本号、消息 ID 来订阅需要的数据源。用户也可以通过fieldNamefieldValuesextraOrderLeve来过滤不要的字段、值或者指定部分数据源中的档位深度。

MDL::subscribe(handle, objByName("streamCffexLevel1"), 
                      " MDL SID_ MDL _CFFEX", " MDL VID_ MDL _CFFEX", 1)

6. 启动 MDL 连接

启动 MDL 连接并开始数据接收。

MDL::connect MDL (handle)

7. MDL 运行状态监控

通过接口用户可实时查看 MDL 连接的运行状态,确保数据传输正常。该接口会返回一张表,其中包含已经处理的消息数、处理失败的消息数、最后一条错误消息发生的时间等。

MDL::getStatus(handle)

2. 中国金融期货交易所简介

中国金融期货交易所(以下简称“中金所”)是经国务院同意,中国证监会批准设立的,专门从事金融期货、期权等金融衍生品交易与结算的公司制交易所。

中金所期货上市类产品包括:

  • 权益类
    • 沪深300股指期货,简称 IF
    • 中证500股指期货,简称 IC
    • 中证1000股指期货,简称 IM
    • 上证50股指期货,简称 IH
  • 利率类
    • 2年期国债期货,简称 TS
    • 5年期国债期货,简称 TF
    • 10年期国债期货,简称 T
    • 30年期国债期货,简称 TL

3. MDL 行情数据介绍

本节将介绍由通联数据公司提供的中金所期货Level1的行情数据结构。该数据分为历史数据和实时行情数据,通常操作是将历史数据落库后接入实时数据。由于两种数据的结构有所不同,因此需要对实时数据进行处理,将其与历史数据对齐后再接入。本文主要介绍字段对齐部分。

3.1 期货 Level 1 数据说明

期货市场的Level1和 Level 2 数据是指交易行情和订单委托行情(Trades and Quotes, TAQ)的快照数据,快照频率为 500ms 一个推送。 Level 1 和 Level 2 的区别在于Level1的订单委托行情只有一档,而 Level 2 则有五档,所以 Level 2 相比于 Level 1 提供了更深的订单委托行情数据。目前,期货市场的 Level 1 快照为500ms,Level 2 中,中金所和上期所是500ms,其他交易所是250ms。

3.2 中金所 Level 1 数据结构

通联数据提供的中金所 Level 1 分为实时数据及历史数据,以下是两种数据源的结构说明:

表 3-1 实时行情数据结构

列名类型含义
InstruIDSTRING合约代码
LastPriceDOUBLE最新价
PreSetPriceDOUBLE昨结算
OpenPriceDOUBLE今开盘
HighPriceDOUBLE最高价
LowPriceDOUBLE最低价
TurnoverDOUBLE成交金额,单位元,单边计算
OpenIntDOUBLE持仓量,单位手,单边计算
SetPriceDOUBLE今结算
ULimitPriceDOUBLE涨停板价
LLimitPriceDOUBLE跌停板价
TradDayDATE结算日
PreCloPriceDOUBLE昨收盘
VolumeINT成交量,单位手,单边计算
ClosePriceDOUBLE今收盘
PreDeltaDOUBLE昨虚实度
CurrDeltaDOUBLE今虚实度
UpdateTimeTIME最后修改时间
PreOpenIntDOUBLE昨持仓量,单位手,单边计算
BidPrice1DOUBLE申买价一
BidVolume1INT申买量一
AskPrice1DOUBLE申卖价一
AskVolume1INT申卖量一
BidPrice2DOUBLE申买价二
BidVolume2INT申买量二
AskPrice2DOUBLE申卖价二
AskVolume2INT申卖量二
BidPrice3DOUBLE申买价三
BidVolume3INT申买量三
AskPrice3DOUBLE申卖价三
AskVolume3INT申卖量三
BidPrice4DOUBLE申买价四
BidVolume4INT申买量四
AskPrice4DOUBLE申卖价四
AskVolume4INT申卖量四
BidPrice5DOUBLE申买价五
BidVolume5INT申买量五
AskPrice5DOUBLE申卖价五
AskVolume5INT申卖量五
AveragePriceDOUBLE均价
ActionDayDATE交易日

表 3-2 历史行情数据结构

列名类型含义
CONTRACTIDSYMBOL合约代码
LASTPXDOUBLE最新价
PRESETTLEDOUBLE昨结算
OPENPXDOUBLE开盘价
HIGHPXDOUBLE最高价
LOWPXDOUBLE最低价
OPENINTSDOUBLE持仓量
RISELIMITDOUBLE涨停板
FALLLIMITDOUBLE跌停板
PRECLOSEDOUBLE昨收盘
CLOSEPXDOUBLE收盘价
PREDELTADOUBLE昨虚实度
CURRDELTADOUBLE今虚实度
B1DOUBLE申买价一
B2DOUBLE申买价二
B3DOUBLE申买价三
B4DOUBLE申买价四
B5DOUBLE申买价五
BV1INT申买量一
BV2INT申买量二
BV3INT申买量三
BV4INT申买量四
BV5INT申买量五
S1DOUBLE申卖价一
S2DOUBLE申卖价二
S3DOUBLE申卖价三
S4DOUBLE申卖价四
S5DOUBLE申卖价五
SV1INT申卖量一
SV2INT申卖量二
SV3INT申卖量三
SV4INT申卖量四
SV5INT申卖量五
TDATEDATE交易日期
SETTLEMENTPXDOUBLE结算价
TMDOUBLE累计成交金额
CLEARINGDAYDATE清算日期
TTIME+UPDATEMILLISECTIME交易时间+交易时间毫秒
INITOPENINTSINT初始持仓量
AVGPXDOUBLE当日均价
TQINT累计成交量
TTIMETIME交易时间
ExchangeInstIDSYMBOL合约交易所编码
CMDOUBLE瞬时成交金额
OCSYMBOL开平仓性质
LASTQTYINT最新成交量
INTSCHGINT持仓量变化
LIFELOWDOUBLE历史最低价
LIFEHIGHDOUBLE历史最高价
AVGPXDOUBLE当日均价
BIDIMPLYQTYINT申买推导量
ASKIMPLYQTYINT申卖推导量
BSRATIODOUBLE买比
SIDESYMBOL买卖方向
MFLXIDSYMBOL连续合约代码
MFLXNAMESYMBOL连续合约名称
LOCALTMTIME本地时间戳
MARKETSYMBOL交易所
CHGDOUBLE价格涨跌
CHGPCTDOUBLE价格涨跌幅(%)
VARIETIESSYMBOL品种索引
SETTLEGROUPIDSYMBOL结算组代码
SETTLEIDINT结算编号
UNIXLONG交易时间对应的 UNIX 时间戳
MFLAGSYMBOL主力合约标记
SOURCESYMBOL数据源
OffsetINT更新时间毫秒级偏移量
TFLAGSYMBOL日夜盘标记
CONTRACTNAMESYMBOL合约名称
CONTRACTCODESYMBOL合约代码(英文)
CQINT瞬时成交量

3.3 数据存储方案

为实现数据库存储及查询的最优性能,我们需要针对不同数据的特点,设计不同的分区方案存储。本文提供存储期货 Level 1 数据的最佳存储方案,用户也可以根据自身需要进行调整。更多金融数据的存储方案可参考《存储金融数据的分区方案最佳实践》。

3.3.1 期货L1数据存储方案

存储引擎:TSDB

分区方式: VALUE,按天分区

分区字段: TDATE

 def createFutureL1(dbName,tbName){
	if(!existsDatabase(dbName)){
		db = database(dbName, VALUE, 2020.01.01..2020.01.02,,"TSDB")
	}else{
		db = database(dbName)
	}
	name = `TDATE`TTIME`UPDATEMILLISEC`CONTRACTID`ExchangeInstID`CONTRACTNAME`LASTPX`HIGHPX`LOWPX`CQ`CM`OC`TQ`TM`LASTQTY`INITOPENINTS`OPENINTS`INTSCHG`RISELIMIT`FALLLIMIT`PRESETTLE`PRECLOSE`BuyPrice`BuyVol`SellPrice`SellVol`OPENPX`CLOSEPX`SETTLEMENTPX`LIFELOW`LIFEHIGH`AVGPX`BIDIMPLYQTY`ASKIMPLYQTY`BSRATIO`SIDE`MFLXID`MFLXNAME`PREDELTA`CURRDELTA`LOCALTM`MARKET`CHG`CHGPCT`VARIETIES`SETTLEGROUPID`SETTLEID`UNIX`CLEARINGDAY`MFLAG`SOURCE`CONTRACTCODE`Offset`TFLAG
	type = [DATE, SECOND, INT, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, SYMBOL, INT, DOUBLE, INT, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE[], INT[], DOUBLE[], INT[], DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, DOUBLE, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, TIME, SYMBOL, DOUBLE, DOUBLE, SYMBOL, SYMBOL, INT, LONG, DATE, SYMBOL, SYMBOL, SYMBOL, INT, SYMBOL]
	schemaTable = table(1:0, name, type)
	db.createPartitionedTable(
			table=schemaTable, tableName=tbName, partitionColumns=`TDATE, 
			compressMethods={TradeTime:"delta"}, sortColumns=`CONTRACTID`TTIME, 
			keepDuplicates=ALL)	
}

3.3.2 期货分钟线存储方案

存储引擎:TSDB

分区方式:RANGE,按年分区

分区字段:TDATE

 def createFutureL1_KMin(dbName,tbName){
	if(!existsDatabase(dbName)){
		db = database(dbName, RANGE, 1980.01M + (1..20) * 60,,"TSDB")
	}else{
		db = database(dbName)
	}
	name = `CLEARINGDAY`TDATE`CONTRACTID`MARKET`Offset`bartime`closeprice`openprice`highprice`lowprice`volume`value`vwap`OPENINTS
	type = `DATE`DATE`SYMBOL`SYMBOL`INT`MINUTE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`INT
	schemaTable = table(1:0, name, type)
	db.createPartitionedTable(table=schemaTable, tableName=tbName, 
	partitionColumns=`TDATE, compressMethods={TradeTime:"delta"}, 
	sortColumns=`CONTRACTID`TDATE`bartime, keepDuplicates=ALL, 
	sortKeyMappingFunction = [hashBucket{,10}, hashBucket{,50}])	
}

3.4 实时行情对齐历史行情

从表3-1和表3-2可以看出 MDL 提供的历史行情和实时行情字段名和字段数量不完全相同,历史行情提供了更多的信息,因此以下提供对齐方法,以及如何对缺失值处理。

表 3-3 实时行情对齐历史行情字段

实时数据字段实时数据字段含义历史数据字段历史数据字段含义缺失值处理
InstruID合约代码CONTRACTID合约代码
LastPrice最新价LASTPX最新价
PreSetPrice昨结算PRESETTLE昨结算
OpenPrice今开盘OPENPX开盘价
HighPrice最高价HIGHPX最高价
LowPrice最低价LOWPX最低价
OpenInt持仓量,单位手,单边计算OPENINTS持仓量
ULimitPrice涨停板价RISELIMIT涨停板
LLimitPrice跌停板价FALLLIMIT跌停板
PreCloPrice昨收盘PRECLOSE昨收盘
ClosePrice今收盘CLOSEPX收盘价
PreDelta昨虚实度PREDELTA昨虚实度
CurrDelta今虚实度CURRDELTA今虚实度
BidPrice1申买价一B1申买价一
BidPrice2申买价二B2申买价二
BidPrice3申买价三B3申买价三
BidPrice4申买价四B4申买价四
BidPrice5申买价五B5申买价五
BidVolume1申买量一BV1申买量一
BidVolume2申买量二BV2申买量二
BidVolume3申买量三BV3申买量三
BidVolume4申买量四BV4申买量四
BidVolume5申买量五BV5申买量五
AskPrice1申卖价一S1申卖价一
AskPrice2申卖价二S2申卖价二
AskPrice3申卖价三S3申卖价三
AskPrice4申卖价四S4申卖价四
AskPrice5申卖价五S5申卖价五
AskVolume1申卖量一SV1申卖量一
AskVolume2申卖量二SV2申卖量二
AskVolume3申卖量三SV3申卖量三
AskVolume4申卖量四SV4申卖量四
AskVolume5申卖量五SV5申卖量五
ActionDay交易日TDATE交易日期
SetPrice今结算SETTLEMENTPX结算价
Turnover成交金额,单位元,单边计算TM累计成交金额
TradDay结算日CLEARINGDAY清算日期
UpdateTime最后修改时间TTIME+UPDATEMILLISEC交易时间+交易时间毫秒
PreOpenInt昨持仓量,单位手,单边计算INITOPENINTS初始持仓量
AveragePrice均价AVGPX当日均价
Volume成交量,单位手,单边计算TQ累计成交量
缺失TTIME交易时间second(TTIME+UPDATEMILLISEC)
缺失ExchangeInstID合约交易所编码用NULL填充
缺失CM瞬时成交金额用0填充
缺失OC开平仓性质用0填充
缺失LASTQTY最新成交量用NULL填充
缺失INTSCHG持仓量变化通联计算,OPENINTS- INITOPENINTS
缺失LIFELOW历史最低价用NULL填充
缺失LIFEHIGH历史最高价用NULL填充
缺失AVGPX当日均价用NULL填充
缺失BIDIMPLYQTY申买推导量用NULL填充
缺失ASKIMPLYQTY申卖推导量用NULL填充
缺失BSRATIO买比用0填充
缺失SIDE买卖方向用NULL填充
缺失MFLXID连续合约代码用NULL填充
缺失MFLXNAME连续合约名称用NULL填充
LocalTime本地创建时间LOCALTM本地时间戳
缺失MARKET交易所用CFFEX填充
缺失CHG价格涨跌(LASTPX- PRESETTLE)
缺失CHGPCT价格涨跌幅(%)(LASTPX/ PRESETTLE -1)*100%
缺失VARIETIES品种索引用NULL填充
缺失SETTLEGROUPID结算组代码用NULL填充
缺失SETTLEID结算编号用0填充
缺失UNIX交易时间对应的 UNIX 时间戳用0填充
缺失MFLAG主力合约标记按照前一天的持仓量来确定主力合约
缺失SOURCE数据源用“ MDL ”填充
缺失Offset更新时间毫秒级偏移量用NULL填充
缺失TFLAG日夜盘标记用1填充
缺失CONTRACTNAME合约名称用NULL填充
缺失CONTRACTCODE合约代码(英文)去除合约代码的到期时间获得
缺失CQ瞬时成交量用0填充

上述表格中历史数据结构由通联公司提供,包括了多个期货交易所的数据字段或者历史数据字段,并非是实时数据缺少信息,所以我们将中金所没有的字段用空值或其他值填充。其中真正需要计算确定的有有:TTIME, INTSCHG, MARKET, CHG, CHGPCT, MFLAG, SOURCE, CONTRACTCODE

  • TTIME: 交易时间,对最后修改时间取 second 获取,即 second(TTIME+UPDATEMILLISEC)
  • INTSCHG: 持仓量变化,使用持仓量减去初始持仓量 (OPENINTS- INITOPENINTS) 计算
  • MARKET: 交易所,针对的是中金所数据处理,因此填充 CFFEX
  • CHG, CHGPCT: 价格涨跌,价格涨跌幅(%),分别表示价格变化和变化的百分比幅度,因此用LASTPX- PRESETTLE(LASTPX/ PRESETTLE -1)*100%计算
  • SOURCE: 数据源,由于只有一个数据源,所以就只填充 MDL
  • CONTRACTCODE: 通过去除合约代码的到期时间获得。
    • 由于存在 EFP(期货转现货交易,Exchange of Futrues for Physicals)的情况,简单的去除到期时间不一定能够获取到正确的合约代码,还需要进行一层替换,CONTRACTID.regexReplace("[0-9]", "").strReplace("(EFP)", "")
  • MFLAG: 主力合约标记,0表示不是当天的主力合约,1表示是当天的主力合约,主力合约的判断是在盘前通过前一个交易日的持仓量最大的合约来确定,并且在盘中不做更改。主力合约选择需要依据历史数据。

4. DolphinDB MDL 行情数据接入解决方案

DolphinDB 专门提供 MDL 插件,用于实时接入 MDL 行情数据,并提供流计算等功能帮助用户实时计算行情数据,以下为数据接入处理的流程图:

注:历史的行情数据导入请联系 DolphinDB 小助手(dolphindb1)。

4.1 实时行情落库

实时行情落库的步骤通常是接收数据、处理实时数据,最后将数据写入分布式库表。其中,实时数据的处理是指对齐历史结构与实时结构的字段。对齐方法已在上一节中说明。

4.1.1 创建接收行情的流数据表

创建一个名为 quoteData Level1 的持久化共享流表创建成功,用于存放订阅的行情数据。

// 设置流表名称
quoteData = "quoteDataLevel1" 

// 获取行情数据格式
tbSchema = MDL::getSchema(svrID, msgID)

// 创建持久化流数据表
enableTableShareAndPersistence(
    streamTable(1:0, tbSchema["name"], tbSchema["type"]),
    quoteData,
    cacheSize=1000000, // 保留在系统的内存中的表的条数 1000000条
    retentionMinutes=1440,  // 持久化保留时间 1天
    preCache=1000 // 预加载1000条记录
)

4.1.2 连接 MDL 并订阅行情

输入MDL软件的IP、端口及账户信息后,建立句柄,连接后开始接收数据。

// MDL 的 IP、端口及账户
host = 
port =
username = 

// 创建 MDL 句柄
handle = MDL::createHandle("Handle_CFFE_L1_Future", host, port, username)

// 获取流表对象
outputTable = objByName("quoteDataLevel1")

// 订阅行情数据
svrID, svrVersion, msgID = " MDL SID_ MDL _CFFEX", " MDL VID_ MDL _CFFEX", 1
MDL::subscribe(handle, outputTable, svrID, svrVersion, msgID)

// 开始订阅
MDL::connect MDL (handle)

svrID、svrVersion、msgID 分别表示通联插件的数据服务 ID、数据服务版本号、消息 ID,用于获取通联的中金所实时期货数据。

只有在用户执行了MDL::connect MDL (handle)后, MDL 插件才会正式开始接收行情数据。此时可以查看流表:

图 4-1 行情数据流表

4.1.3 实时行情转换落库代码

实时行情数据入库时,我们通常不推荐数据实时入库,因为会造成数据库后台频繁的刷盘,推荐方法两种:

  1. 建立定时任务:盘后取出流表数据,处理后统一入库,附录模块文件中展示该方法;
  2. 建立流订阅:每N小时入库一次,以下代码将展示该方法,设置最大间隔为24小时:

流订阅步骤一:获取主力合约对应的码表

// 根据持仓量获取主力合约码表
dbName = "dfs://tlFutL1"
tbName = "cffexL1"

// 获取上一个交易日
todayDate = date(now())
predate = getMarketCalendar("CFFEX",todayDate-10,todayDate-1)
predate = predate[predate.size()-1]

// 根据持仓量获取主力合约码表
main_contract_map = select *, 
    CONTRACTID.regexReplace("[0-9]", "").strReplace("(EFP)", "") as group 
    from (
      select sum(OPENINTS) as sum_int from loadTable(dbName, tbName
    ) 
    where TDATE=predate group by CONTRACTID) 
    context by CONTRACTID.regexReplace("[0-9]", "").strReplace("(EFP)", "") 
    csort sum_int desc 
    limit 1
share(main_contract_map, "mainContractMap")

流订阅步骤二:根据以上计算或填充规则处理缺失列数据

// L1数据处理函数
def L1_convert_handle(dbName, tbName, msg) {
    // 对应列的名称修改
    origin_name = ["InstruID", "LastPrice", "PreSetPrice", "OpenPrice", "HighPrice", "LowPrice", "OpenInt", "ULimitPrice", "LLimitPrice", "PreCloPrice", "ClosePrice", "PreDelta", "CurrDelta", "BidPrice1", "BidPrice2", "BidPrice3", "BidPrice4", "BidPrice5", "BidVolume1", "BidVolume2", "BidVolume3", "BidVolume4", "BidVolume5", "AskPrice1", "AskPrice2", "AskPrice3", "AskPrice4", "AskPrice5", "AskVolume1", "AskVolume2", "AskVolume3", "AskVolume4", "AskVolume5", "ActionDay", "SetPrice", "Turnover", "TradDay", "UpdateTime", "PreOpenInt", "AveragePrice", "Volume", "LocalTime"]
    new_name = ["CONTRACTID", "LASTPX", "PRESETTLE", "OPENPX", "HIGHPX", "LOWPX", "OPENINTS", "RISELIMIT", "FALLLIMIT", "PRECLOSE", "CLOSEPX", "PREDELTA", "CURRDELTA", "B1", "B2", "B3", "B4", "B5", "BV1", "BV2", "BV3", "BV4", "BV5", "S1", "S2", "S3", "S4", "S5", "SV1", "SV2", "SV3", "SV4", "SV5", "TDATE", "SETTLEMENTPX", "TM", "CLEARINGDAY", "TTIME_UPDATEMILLISEC", "INITOPENINTS", "AVGPX", "TQ", "LOCALTM"]
    tmp = sql(sqlColAlias(sqlCol(origin_name), new_name), msg)
    tmp =  MDL _7_1_0_add_columns(tmp)
    tmp =  MDL _7_1_0_add_MFLAG(tmp)

    // 导入数据库中
    n = count(tmp)
    tb = loadTable(dbName, tbName)
    rows = tb.tableInsert(tmp)
    
    // 判断是否导入成功
    if(n != rows){
      throw "导入失败,需要导入数据" + string(n) + "条,仅导入数据" + string(rows) + "条."
    }
}
// 订阅实时行情并处理落库
subscribeTable(
    name="L1DataConvert",
    tableName="quoteDataLevel1",
    actionName="L1DataConvertProcess",
    handler=L1_convert_handle{dbName, tbName},
    msgAsTable=true,
    batchSize=700000, // 每日数据量大概60w,统一盘后入库
    timeTrigger=true,
    throttle=24*60*60 // 设置为每24h触发一次
)

这里通过subscribeTable函数订阅上一步 quoteDataLevel1 的流表,处理后直接写入分布式库表内。

其中L1_convert_handle函数先将数据列名对齐后,通过 MDL _7_1_0_add_columns函数添加缺失的字段, MDL _7_1_0_add_MFLAG函数判断主力合约标记。最终将数据写入数据库中,如果导入不成功将会报错。具体字段处理方法可查看附录文件。

4.2 实时行情聚合分钟K线

由于数据存在时延,原始行情数据往往在盘后时间到达。所以在计算前,需要先对原始行情的数据时间进行规整处理。而后再将规整的数据进行实时聚合计算及指标计算。DolphinDB提供流计算引擎将行情实时计算,和实时计算需要的其他指标,例如成交量加权平均价格(VWAP)以及判断当前主力合约等。

4.2.1 原始行情数据处理

原始行情的数据处理涉及到对中午、下午收盘时间的规整处理以及对缺失值进行处理,方便K线聚合计算。

4.2.1.1 收盘时间规整处理

由于实时行情从交易所到用户的过程中存在一定的时延,因此临近收盘时的行情数据通常会在交易所收盘后才到达用户手中,导致用户的K线提前闭合,计算K线错误,所以在使用实时行情数据合成K线的时候需要先对行情数据进行时间规整处理。

第一,处理临近收盘时的数据。例如将 11:30:02 的数据规整到 11:30:00 ,避免数据丢失。核心处理逻辑如下,其中的end_time_map分别表示不同品种的中午收盘时间和下午收盘时间,需要用户提前根据合约规则指定。

def orgin_data_process(msg, Process_engine) {
	end_time_map1, start_time_map2 = objByName("edate1"), objByName("sdate2")
	end_time_map2 = objByName("edate2")

    tmp = select InstruID as unified_code, "CCFX" as market,
              concatDateTime(ActionDay, UpdateTime) as data_time, 
              TradDay as trade_date, 
              LastPrice as last_price, 
              PreCloPrice as pre_close_price,
              OpenPrice as open_price, 
              HighPrice as high_price, 
              LowPrice as low_price,
              ClosePrice as close_price,
              Volume as volume,
              Turnover as turnover, 
              long(NULL) as trades_count, 
              ULimitPrice as upper_limit_price, 
              LLimitPrice as lower_limit_price,
              double(NULL) as iopv,
              PreSetPrice as pre_settlement_price, 
              SetPrice as settlement_price,
              OpenInt as open_interest 
          from msg
	
	// 中午收盘时的处理
	update tmp set data_time = concatDateTime(date(data_time), 
		end_time_map1[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")]) 
		where time(data_time)>end_time_map1[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")] 
		and time(data_time)<start_time_map2[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")]

	// 下午收盘时的处理
	update tmp set data_time=concatDateTime(date(data_time), 
      end_time_map2[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")]) 
      where time(data_time)>end_time_map2[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")] 
      and time(data_time)<temporalAdd(end_time_map2[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")], 2,"H")

	tableInsert(getStreamEngine(Process_engine), tmp)
}

第二,处理空值和累计值。对原始数据的空值进行填充,并且将 volume 这样的累计值转化为差值,方便后续计算K线时加总求和。转化代码如下:

@state
def calDeltasDay(val, data_time, unified_code){
    timeMap = objByName(`sdate1)
    return iif(deltas(val)==NULL, iif(timeMap[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")]==NULL, val, 0.0), deltas(val)) 
}

def process_convert(){
    convert = [<market>,<data_time>, 
        <trade_date>,<last_price>, <pre_close_price>,<open_price>, 
        <cummax(iif(high_price==0.0, NULL, high_price))>, 
        <cummin(iif(low_price==0.0, NULL, low_price))>, 
        <close_price>, <volume>, <turnover>, <trades_count>, <upper_limit_price>, 
        <lower_limit_price>, <iopv>,
        <pre_settlement_price>, <settlement_price>, <open_interest>, 
        <iif(deltas(cummax(iif(high_price==0.0, NULL, high_price)))>0.000001, 1, 0) as deltas_high_price>, 
        <iif(abs(deltas(cummin(iif(low_price==0.0, NULL, low_price))))>0.000001, -1, 0) as deltas_low_price>,
        <calDeltasDay(volume, data_time, unified_code) as deltas_volume>, 
        <calDeltasDay(turnover, data_time, unified_code) as deltas_turnover>, 
        <iif(deltas(trades_count)==NULL, trades_count, deltas(trades_count)) as deltas_trades_count>]
    return convert
}

第三,将上述两个处理方法通过ReactiveState Engine流引擎以及流订阅关联,注意流计算的代码开发步骤与链路是相反的。

// 1. 创建中间过程流表
process_table = "processed_data"

colNames = `unified_code`market`data_time`trade_date`last_price`pre_close_price`open_price`high_price`low_price`close_price`volume`turnover`trades_count`upper_limit_price`lower_limit_price`iopv`pre_settlement_price`settlement_price`open_interest`deltas_high_price`deltas_low_price`deltas_volume`deltas_turnover`deltas_trades_count
colTypes = ["SYMBOL","SYMBOL","TIMESTAMP","DATE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","LONG","DOUBLE","LONG","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","LONG","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE"]

enableTableShareAndPersistence(
    streamTable(100000:0, colNames, colTypes),
    process_table,
    cacheSize = 1000000, // 保留在系统的内存中的表的条数 1000000条
    retentionMinutes=1440, // 持久化保留时间 1天
    preCache=0
)

// 2. 创建填充空值的流引擎
Process_engine = "L1_QuoteData_process"
createReactiveStateEngine(
        name=Process_engine, metrics=process_convert(), 
        dummyTable=process_input_schema(1), 
        outputTable=objByName(process_table), 
        keyColumn="unified_code", 
        keepOrder = true)

// 3. 订阅 QuoteData 
subscribeTable(
    tableName=QuoteData, actionName=QuoteData+"Process",
    handler=orgin_data_process{, Process_engine}, // 数据处理
    msgAsTable=true,
    batchSize=5000, throttle=0.001, reconnect=true
)

接下来就是通过生成的中间数据流表 processed_data 来合成K线。

4.2.2 K线聚合

K线聚合的过程同样分为两步,首先将高频数据通过时序聚合引擎 DailyTimeSeriesEngine 聚合为分钟频数据,再通过响应式状态引擎 ReactiveStateEngine 处理缺失值及计算成交量加权平均价格(VWAP)。

4.2.2.1 时序聚合引擎合成 K 线

1. 确定需要计算的 OHLC 等列,以及计算方法;

// K线计算方法
// priceFilter方法用于筛选出其他期货日夜盘价格,具体代码请参考模块
def process_to_kline(){
	convert = [<first(market) as market>,
			<first(trade_date) as trade_date>,
            <firstNot(priceFilter(last_price, volume, data_time, unified_code)) as open_price>,
			<max(priceFilter(last_price, volume, data_time, unified_code)) as high_price>,
			<min(priceFilter(last_price, volume, data_time, unified_code)) as low_price>,
			<lastNot(last_price) as close_price>,
			<sum(deltas_volume) as volume>,
			<sum(deltas_turnover) as turnover>,
			<sum(deltas_trades_count) as trades_count>,
			<firstNot(pre_close_price) as pre_close_price>,
			<firstNot(upper_limit_price) as upper_limit_price>,
			<firstNot(lower_limit_price) as lower_limit_price>,
			<last(settlement_price) as settlement_price>,
			<last(pre_settlement_price) as pre_settlement_price>,
			<last(open_interest) as open_interest>,
			<firstNot(iopv) as iopv>,
			<tseConstFill(00f) as day_session_open>,
			<tseConstFill(string(NULL)) as domainid>,
			<lastNot(open_price) as first_open_price>,
			<lastNot(low_price) as first_low_price>,
			<lastNot(high_price) as first_high_price>]
	fillList = ["ffill", "ffill", 'null', 'null', 'null', 'null', 0, 0, 
				'null', 'ffill', 'ffill', 'ffill', 'ffill', "ffill", "ffill", 'null', 
				'null', 'null', "ffill", 'ffill', "ffill"]
	return convert, fillList
}

2. 创建两个时序聚合引擎

  • 创建两个引擎的原因:中金所部分期货的下午收盘时间为 15:15:00 ,而其他的则是正常的 15:00:00 ,目前单个时序聚合引擎还无法做到将这两者区分。
  • 开盘第一根K线处理方法:当createDailyTimeSeriesEngine的 closed取值为“right”时,9:30-9:31 中的数据合并的K线被称为9:31,因此为了获取到9:30的 K 线,需要将区间的开始时间往前拉 n 分钟。
// 用于填充值
defg tseConstFill(fillVal){
	return fillVal
}

// n分钟的k线
nMinute = 1

// 创建日聚合引擎1
windowSize = 60000*nMinute
convert, fillList = process_to_kline()
sessionBegin = time([temporalAdd(09:30:00,-nMinute,"m"),13:00:00])
sessionEnd = time(11:30:00 15:00:00)

createDailyTimeSeriesEngine(
    name="Kline_engine00", 
    windowSize=windowSize, step=windowSize, metrics=convert,
    dummyTable=objByName(process_table),
    outputTable=getStreamEngine(cal_Kline_engine), 
    timeColumn=`data_time, 
    keyColumn="unified_code", 
    closed="right",
    useWindowStartTime=false, 
    forceTriggerTime=1, fill=fillList, 
    sessionBegin=sessionBegin, sessionEnd=sessionEnd, 
    mergeSessionEnd=true, forceTriggerSessionEndTime=windowSize
)

// 创建日聚合引擎2
windowSize = 60000*nMinute 
convert, fillList = process_to_kline()
sessionBegin = time([temporalAdd(09:30:00,-nMinute,"m"),13:00:00])
sessionEnd = time(11:30:00 15:15:00)
createDailyTimeSeriesEngine(
    name="Kline_engine15", 
    windowSize=windowSize, step=windowSize, metrics=convert,
    dummyTable=objByName(process_table),
    outputTable=getStreamEngine(cal_Kline_engine), 
    timeColumn=`data_time, 
    keyColumn="unified_code", 
    closed="right",
    useWindowStartTime=false, 
    forceTriggerTime=1, fill=fillList, 
    sessionBegin=sessionBegin, sessionEnd=sessionEnd, 
    mergeSessionEnd=true, forceTriggerSessionEndTime=windowSize
)

4.2.2.2 响应式状态引擎 ReactiveStateEngine处理缺失值及计算VWAP

我们用响应式状态引擎 ReactiveStateEngine对缺失的数据进行填充。其中,VWAP的计算公式为 value/volume,表示一手合约的平均价值,成交额数据的单位是元,成交量的单位是手。但期货的报价并不等于一手合约的价值,例如利率期货合约的报价是百元净价报价,权益类期货合约的报价是指数点。因此在计算 VWAP 时需要一个转换系数,将一手合约的平均价值转换为报价相匹配的单位。如2年期国债期货合约,其报价方式为百元净价报价,合约的标的是面值为200万元人民币、票面利率为3%的名义中短期国债,因此需要将一手合约的平均价值除以20000从而和报价相匹配,其余的期货合约同理。因此,本模块维护了一个转换系数的字典 volmap 记录中金所所有期货合约的转换系数。

// 计算vwap函数,下有详细说明
def calVwap(volume, turnover, unified_code){
	volMap = objByName(`vwap_map) //转换系数字典,参考模块内代码
	vol = nullFill(volMap[unified_code.regexReplace("[0-9]", "").strReplace("(EFP)", "")], 1.0)
	return turnover\volume\vol
}

// k线处理
def cal_kline_convert(){
    convert = [<data_time>,<trade_date>,
            <nullFill(open_price, cumlastNot(close_price).nullFill(pre_close_price))>,
            <nullFill(high_price, cumlastNot(close_price).nullFill(pre_close_price))>,
            <nullFill(low_price, cumlastNot(close_price).nullFill(pre_close_price))>,
            <nullFill(close_price, cumlastNot(close_price).nullFill(pre_close_price))>,
            <nullFill(volume, 0)>,
            <nullFill(turnover, 0.0)>,
            <calVwap(volume, turnover, unified_code).nullFill(cumlastNot(close_price).nullFill(pre_close_price))>,
            <trades_count>,
            <pre_close_price>,
            <upper_limit_price>,
            <lower_limit_price>,
            <settlement_price>,
            <pre_settlement_price>,
            <open_interest>,
            <iopv>,
            <day_session_open>,
            <domainid>
            ]
    return convert
}

// 建立rse引擎
engine = createReactiveStateEngine(name=cal_Kline_engine, metrics=cal_kline_convert(), 
    dummyTable=cal_kline_input(1),
    outputTable=objByName(KlineData),
    keyColumn=["unified_code", "market"], 
    keepOrder = true
)
// 共享该引擎上锁
share(engine ,cal_Kline_engine)

5. 总结

本文详细介绍了 DolphinDB MDL 插件针对中金所 Level1 期货行情数据的实时接入流程。首先,我们介绍了中国金融期货交易所的期货产品以及 MDL 提供的 Level 1 数据结构。针对此,我们提供了中金所 Level1数据及分钟K线的存储方案。在做好准备工作后,我们详细说明了实时接入数据的步骤与细节。通过创建流数据表、连接MDL并订阅行情,本文展示了如何高效地接入、存储并处理实时行情数据。此外,分钟K线的合成过程也得到了详细说明。

通过本文的介绍,读者能够全面了解 DolphinDB MDL 插件的使用方法及在其期货市场中的实际应用场景,为金融数据的实时处理、存储与分析提供了可行的解决方案。

6. 附录

MDLCFFEX.zip


http://www.kler.cn/a/513982.html

相关文章:

  • 如何使用CRM数据分析优化销售和客户关系?
  • 基于python+Django+mysql鲜花水果销售商城网站系统设计与实现
  • leetcode 面试经典 150 题:合并区间
  • 【数学建模美赛速成系列】O奖论文绘图复现代码
  • 云原生作业(四)
  • 简述mysql 主从复制原理及其工作过程,配置一主两从并验证。
  • 【Linux】多线程(二)
  • Linux:常用命令--文件与目录操作
  • docker构建Java项目镜像常用的Java版本,国内私有仓库公网快速下载,解决从docker.io无法下载的问题
  • 【Elasticsearch】腾讯云安装Elasticsearch
  • C#集合操作优化:高效实现批量添加与删除
  • vue3+uniapp开发鸿蒙初体验
  • 【图像处理】——掩码
  • C#防止重复提交
  • Unity中两个UGUI物体的锚点和中心点设置成不一样的,然后怎么使两个物体的位置一样?
  • vsftpd虚拟用户部署
  • MATLAB中characterListPattern函数用法
  • 【爱上C++】vector用法详解
  • 案例分析一
  • MySQL新建和删除普通用户
  • Mac苹果电脑 怎么用word文档和Excel表格?
  • 如何使用Python爬虫获取微店商品详情:代码示例与实践指南
  • 《C++ primer plus》第六版课后编程题-第04章
  • 【王树森搜素引擎技术】概要03:搜索引擎的评价指标
  • 【ESP32】ESP32连接JY61P并通过WIFI发送给电脑
  • 软件测试 —— Postman(全局变量和环境变量,请求前置脚本,关联)