【Python使用】嘿马头条项目从到完整开发教程第10篇:APScheduler定时任务,1. 什么是RPC【附代码文档】
本教程的知识点为:简介 1. 内容 2. 目标 产品效果 ToutiaoWeb虚拟机使用说明 数据库 理解ORM 作用 思考: 使用ORM的方式选择 数据库 SQLAlchemy操作 1 新增 2 查询 all() 数据库 分布式ID 1 方案选择 2 头条 使用雪花算法 (代码 toutiao-backend/common/utils/snowflake) 数据库 Redis 1 Redis事务 基本事务指令 Python客户端操作 Git工用流 调试方法 JWT认证方案 JWT & JWS & JWE Json Web Token(JWT) OSS对象存储 存储 需求 方案 使用 缓存 缓存架构 多级缓存 头条项目的方案 缓存数据 缓存 缓存问题 1 缓存 2 缓存 头条项目缓存与存储设计 APScheduler定时任务 定时修正统计数据 RPC RPC简介 1. 什么是RPC RPC 编写客户端 头条首页新闻推荐接口编写 即时通讯 即时通讯简介 即时通讯 Socket.IO 1 简介 优点: 缺点: Elasticsearch 简介与原理 1 简介 属于面向文档的数据库 2 搜索的原理——倒排索引(反向索引)、分析、相关性排序 Elasticsearch 文档 索引文档(保存文档数据) 获取指定文档 判断文档是否存在 单元测试 为什么要测试 测试的分类 什么是单元测试 断言方法的使用:
完整笔记资料代码:https://gitee.com/yinuo112/Backend/tree/master/Python/嘿马头条项目从到完整开发教程/note.md
感兴趣的小伙伴可以自取哦~
全套教程部分目录:
部分文件图片:
APScheduler定时任务
定时修正统计数据
在toutiao-backend/toutiao/__init__.py
中添加APScheduler调度器对象
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
def create_app(config, enable_config_file=False):
...
# 添加定时任务APScheduler
executors = {
'default': ThreadPoolExecutor(10)
}
app.scheduler = BackgroundScheduler(executors=executors)
from .schedule.statistic import fix_statistics
# 每天3点执行
app.scheduler.add_job(fix_statistics, 'cron', hour=3, args=[app])
# 立即执行,用于测试
# app.scheduler.add_job(fix_statistics, 'date', args=[app])
app.scheduler.start()
...
在toutiao-backend/toutiao
中新建schedule目录用于存放定时任务
toutiao-backend/toutiao/schedule/statistics.py
from cache import statistic as cache_statistic
def fix_process(count_storage_cls):
"""
修复处理方法
"""
# 进行数据库查询
ret = count_storage_cls.db_query()
# 设置redis数据
count_storage_cls.reset(ret)
def fix_statistics(flask_app):
"""
修正统计数据
"""
with flask_app.app_context():
fix_process(cache_statistic.UserArticlesCountStorage)
fix_process(cache_statistic.UserFollowingsCountStorage)
common/cache/statistic.py
class CountStorageBase(object):
"""
统计数量存储的父类
"""
...
@classmethod
def reset(cls, db_query_ret):
"""
由定时任务调用的重置数据方法
"""
# 设置redis的存储记录
pl = current_app.redis_master.pipeline()
pl.delete(cls.key)
# zadd(key, score1, val1, score2, val2, ...)
# 方式一
# for data_id, count in db_query_ret:
# pl.zadd(cls.key, count, data_id)
# 方式二
redis_data = []
for data_id, count in db_query_ret:
redis_data.append(count)
redis_data.append(data_id)
# redis_data = [count1, data_id1, count2, data_id2, ..]
pl.zadd(cls.key, *redis_data)
# pl.zadd(cls.key, count1, data_id1, count2, data_id2, ..]
pl.execute()
class UserArticlesCountStorage(CountStorageBase):
"""
用户文章数量
"""
key = 'count:user:arts'
@staticmethod
def db_query():
ret = db.session.query(Article.user_id, func.count(Article.id)) \
.filter(Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()
return ret
class UserFollowingsCountStorage(CountStorageBase):
"""
用户关注数量
"""
key = 'count:user:followings'
@staticmethod
def db_query():
ret = db.session.query(Relation.user_id, func.count(Relation.target_user_id)) \
.filter(Relation.relation == Relation.RELATION.FOLLOW)\
.group_by(Relation.user_id).all()
return ret
RPC
RPC简介
1. 什么是RPC
远程过程调用(英语:Remote Procedure Call,缩写为 RPC,也叫远程程序调用)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。
2. 背景与用途
在单台计算机中,我们可以通过程序调用来传递控制和数据;或者说通过程序调用,我们可以将多个程序组成一个整体来实现某个功能。
如果将这种调用机制推广到多台彼此间可以进行网络通讯的计算机,由多台计算机中的多个程序组成一个整体来实现某个功能,这也是可以的。调用的一方(发起远程过程调用,然后调用这方的环境挂起,参数通过网络传递给被调用方,被调用的一方执行程序,当程序执行完成后,产生的结果再通过网络回传给调用的一方,调用的一方恢复继续执行。这样一种原型思想,就是我们所说的RPC远程过程调用。
RPC这种思想最早可以追溯到1976年,RPC的发展到今天已经40年有余了。
如今的计算机应用中,单机性能上很难承受住产品的压力,需要不断扩充多台机器来提升整体的性能。同时为了充分利用这些集群里的计算机,需要对其从架构上进行划分,以提供不同的服务,服务间相互调用完成整个产品的功能。RPC就能帮助我们解决这些服务间的信息传递和调用。
3. 概念说明
关于RPC的概念,我们可以从广义和狭义来分别进行理解。
广义
我们可以将所有通过网络来进行通讯调用的实现统称为RPC。
按照这样来理解的话,那我们发现HTTP其实也算是一种RPC实现。
狭义
区别于HTTP的实现方式,在传输的数据格式上和传输的控制上独立实现。比如在机器间通讯传输的数据不采用HTTP协议的方式(分为起始行、header、body三部份),而是使用自定义格式的二进制方式。
我们更多时候谈到的RPC都是指代这种狭义上的理解。
4. 优缺点
相比于传统HTTP的实现而言:
优点
- 效率高
- 发起RPC调用的一方,在编写代码时可忽略RPC的具体实现,如同编写本地函数调用一样
缺点
- 通用性不如HTTP好 因为传输的数据不是HTTP协议格式,所以调用双方需要专门实现的通信库,对于不同的编程开发语言,都要有相关实现。而HTTP作为一个标准协议,大部分的语言都已有相关的实现,通用性更好。
HTTP更多的面向用户与产品服务器的通讯。
RPC更多的面向产品内部服务器间的通讯。 thrift
RPC结构
RPC的设计思想是力图使远程调用中的通讯细节对于使用者透明,调用双方无需关心网络通讯的具体实现。因而实现RPC要进行一定的封装。
RPC原理上是按如下结构流程进行实现的。
流程:
- 调用者(Caller, 也叫客户端、Client)以本地调用的方式发起调用;
- Client stub(客户端存根,可理解为辅助助手)收到调用后,负责将被调用的方法名、参数等打包编码成特定格式的能进行网络传输的消息体;
- Client stub将消息体通过网络发送给对端(服务端)
- Server stub(服务端存根,同样可理解为辅助助手)收到通过网络接收到消息后按照相应格式进行拆包解码,获取方法名和参数;
- Server stub根据方法名和参数进行本地调用;
- 被调用者(Callee,也叫Server)本地调用执行后将结果返回给server stub;
- Server stub将返回值打包编码成消息,并通过网络发送给对端(客户端);
- Client stub收到消息后,进行拆包解码,返回给Client;
- Client得到本次RPC调用的最终结果。
gRPC
简介
-
gRPC是由Google公司开源的高性能RPC框架。
-
gRPC支持多语言
gRPC原生使用C、Java、Go进行了三种实现,而C语言实现的版本进行封装后又支持C++、C#、Node、ObjC、 Python、Ruby、PHP等开发语言
- gRPC支持多平台
支持的平台包括:Linux、Android、iOS、MacOS、Windows
-
gRPC的消息协议使用Google自家开源的Protocol Buffers协议机制(proto3) 序列化
-
gRPC的传输使用HTTP/2标准,支持双向流和连接多路复用
架构
C语言实现的gRPC支持多语言,其架构如下
使用方法
- 使用Protocol Buffers(proto3)的IDL接口定义语言定义接口服务,编写在文本文件(以
.proto
为后缀名)中。 - 使用protobuf编译器生成服务器和客户端使用的stub代码
- 编写补充服务器和客户端逻辑代码
RPC
Protocol Buffers
Protocol Buffers 是一种与语言无关,平台无关的可扩展机制,用于序列化结构化数据。使用Protocol Buffers 可以一次定义结构化的数据,然后可以使用特殊生成的源代码轻松地在各种数据流中使用各种语言编写和读取结构化数据。
现在有许多框架等在使用Protocol Buffers。gRPC也是基于Protocol Buffers。 Protocol Buffers 目前有2和3两个版本号。
在gRPC中推荐使用proto3版本。
1 文档结构
1) Protocol Buffers版本
Protocol Buffers文档的第一行非注释行,为版本申明,不填写的话默认为版本2。
syntax = "proto3";
或者
syntax = "proto2";
2)Package包
Protocol Buffers 可以声明package,来防止命名冲突。 Packages是可选的。
package foo.bar;
message Open { ... }
使用的时候,也要加上命名空间,
message Foo {
...
foo.bar.Open open = 1;
...
}
注意:对于Python而言,package
会被忽略处理,因为Python中的包是以文件目录来定义的。
3)导入
Protocol Buffers 中可以导入其它文件消息等,与Python的import类似。
import “myproject/other_protos.proto”;
4)定义各种消息和服务
消息messge是用来定义数据的,服务service是用来gRPC的方法的。
2 注释
Protocol Buffers 提供以下两种注释方式。
// 单行注释
//
//
//
/*
多行注释
*/
3 数据类型
3.1 基本数据类型
.proto | 说明 | Python |
---|---|---|
double | float | |
float | float | |
int32 | 使用变长编码,对负数编码效率低, 如果你的变量可能是负数,可以使用sint32 | int |
int64 | 使用变长编码,对负数编码效率低,如果你的变量可能是负数,可以使用sint64 | int/long |
uint32 | 使用变长编码 | int/long |
uint64 | 使用变长编码 | int/long |
sint32 | 使用变长编码,带符号的int类型,对负数编码比int32高效 | int |
sint64 | 使用变长编码,带符号的int类型,对负数编码比int64高效 | int/long |
fixed32 | 4字节编码, 如果变量经常大于2^{28} 的话,会比uint32高效 | int |
fixed64 | 8字节编码, 如果变量经常大于2^{56} 的话,会比uint64高效 | int/long |
sfixed32 | 4字节编码 | int |
sfixed64 | 8字节编码 | int/long |
bool | bool | |
string | 必须包含utf-8编码或者7-bit ASCII text | str |
bytes | 任意的字节序列 | str |
3.2 枚举
在 Proto Buffers 中,我们可以定义枚举和枚举类型,
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
Corpus corpus = 4;
枚举定义在一个消息内部或消息外部都是可以的,如果枚举是 定义在 message 内部,而其他 message 又想使用,那么可以通过 MessageType.EnumType 的方式引用。
定义枚举的时候,我们要保证第一个枚举值必须是0,枚举值不能重复,除非使用 option allow_alias = true 选项来开启别名。
enum EnumAllowingAlias {
option allow_alias = true;
UNKNOWN = 0;
STARTED = 1;
RUNNING = 1;
}
枚举值的范围是32-bit integer,但因为枚举值使用变长编码,所以不推荐使用负数作为枚举值,因为这会带来效率问题。
4 消息类型
Protocol Buffers使用message定义消息数据。在Protocol Buffers中使用的数据都是通过message消息数据封装基本类型数据或其他消息数据,对应Python中的类。
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
}
4.1 字段编号
消息定义中的每个字段都有唯一的编号。这些字段编号用于以消息二进制格式标识字段,并且在使用消息类型后不应更改。 请注意,1到15范围内的字段编号需要一个字节进行编码,包括字段编号和字段类型。16到2047范围内的字段编号占用两个字节。因此,您应该为非常频繁出现的消息元素保留数字1到15。请记住为将来可能添加的常用元素留出一些空间。
最小的标识号可以从1开始,最大到2^29 - 1,或 536,870,911。不可以使用其中的[19000-19999]的标识号, Protobuf协议实现中对这些进行了预留。如果非要在.proto文件中使用这些预留标识号,编译时就会报警。同样你也不能使用早期保留的标识号。
4.2 指定字段规则
消息字段可以是以下之一:
-
singular:格式良好的消息可以包含该字段中的零个或一个(但不超过一个)。
-
repeated:此字段可以在格式良好的消息中重复任意次数(包括零)。将保留重复值的顺序。对应Python的列表。
message Result {
string url = 1;
string title = 2;
repeated string snippets = 3;
}
4.3 添加更多消息类型
可以在单个.proto文件中定义多个消息类型。
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
}
message SearchResponse {
...
}
4.4 保留字段
保留变量不被使用
如果通过完全删除字段或将其注释来更新消息类型,则未来用户可以在对类型进行自己的更新时重用字段编号。如果以后加载相同的旧版本,这可能会导致严重问题,包括数据损坏,隐私错误等。确保不会发生这种情况的一种方法是指定已删除字段的字段编号(或名称)reserved。如果将来的任何用户尝试使用这些字段标识符,protobuf编译器将会报错。
message Foo {
reserved 2, 15, 9 to 11;
reserved "foo", "bar";
}
4.5 默认值
解析消息时,如果编码消息不包含特定的单数元素,则解析对象中的相应字段将设置为该字段的默认值。这些默认值是特定于类型的:
- 对于字符串,默认值为空字符串。
- 对于字节,默认值为空字节。
- 对于bools,默认值为false。
- 对于数字类型,默认值为零。
- 对于枚举,默认值是第一个定义的枚举值,该值必须为0。
- 对于消息字段,未设置该字段。它的确切值取决于语言。
- 重复字段的默认值为空(通常是相应语言的空列表)。
4.6 嵌套类型
你可以在其他消息类型中定义、使用消息类型,在下面的例子中,Result消息就定义在SearchResponse消息内,如:
message SearchResponse {
message Result {
string url = 1;
string title = 2;
repeated string snippets = 3;
}
repeated Result results = 1;
}
如果要在其父消息类型之外重用此消息类型,使用
SearchResponse.Result
5 map映射
如果要在数据定义中创建关联映射,Protocol Buffers提供了一种方便的语法:
map< key_type, value_type> map_field = N ;
其中key_type可以是任何整数或字符串类型。请注意,枚举不是有效的key_type。value_type可以是除map映射类型外的任何类型。
例如,如果要创建项目映射,其中每条Project消息都与字符串键相关联,则可以像下面这样定义它:
map<string, Project> projects = 3 ;
- map的字段可以是repeated。
- 序列化后的顺序和map迭代器的顺序是不确定的,所以你不要期望以固定顺序处理map
- 当为.proto文件产生生成文本格式的时候,map会按照key 的顺序排序,数值化的key会按照数值排序。
- 从序列化中解析或者融合时,如果有重复的key则后一个key不会被使用,当从文本格式中解析map时,如果存在重复的key,则解析可能会失败。
- 如果为映射字段提供键但没有值,则字段序列化时的行为取决于语言。在Python中,使用类型的默认值。
6 oneof
如果你的消息中有很多可选字段, 并且同时至多一个字段会被设置, 你可以加强这个行为,使用oneof特性节省内存。
为了在.proto定义oneof字段, 你需要在名字前面加上oneof关键字, 比如下面例子的test_oneof:
message SampleMessage {
oneof test_oneof {
string name = 4;
SubMessage sub_message = 9;
}
}
然后你可以增加oneof字段到 oneof 定义中. 你可以增加任意类型的字段, 但是不能使用repeated 关键字。
7 定义服务
Protocol Buffers使用service定义RPC服务。
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
注意:一个service中可定义多个方法。
推荐系统接口定义
接口原型
接口名称: user_recommend
调用参数:
UserRequest:
user_id # 用户id
channel_id # 频道id
article_num # 推荐的文章数量
time_stamp # 推荐的时间戳
返回数据:
ArticleResponse:
expousre # 曝光埋点数据
time_stamp # 推荐的时间戳
recommends: # 推荐结果
article_id # 文章id
track: # 关于文章的埋点数据
click # 用户点击行为的埋点参数
collect # 用户收藏的埋点参数
share # 用户分享的埋点参数
read # 用户进入文章详情的埋点参数
使用Protobuf 定义的接口如下
使用protobuf定义的接口文件通常以proto作为文件后缀名
在toutiao-backend/common/rpc目录下新建reco.proto文件
syntax = "proto3";
message UserRequest {
string user_id=1;
int32 channel_id=2;
int32 article_num=3;
int64 time_stamp=4;
}
message Track {
string click=1;
string collect=2;
string share=3;
string read=4;
}
message Article {
int64 article_id=1;
Track track=2;
}
message ArticleResponse {
string exposure=1;
int64 time_stamp=2;
repeated Article recommends=3;
}
service UserRecommend {
rpc user_recommend(UserRequest) returns(ArticleResponse) {}
}
代码生成
安装protobuf编译器和grpc库
pip install grpcio-tools
编译生成代码
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. reco.proto
-I
表示搜索proto文件中被导入文件的目录--python_out
表示保存生成Python文件的目录,生成的文件中包含接口定义中的数据类型--grpc_python_out
表示保存生成Python文件的目录,生成的文件中包含接口定义中的服务类型
在toutiao-backend/common/rpc目录下执行上述命令,会自动生成如下两个rpc调用辅助代码模块:
- reco_pb2.py 保存根据接口定义文件中的数据类型生成的python类
- reco_pb2_grpc.py 保存根据接口定义文件中的服务方法类型生成的python调用RPC方法
补全服务端
为了方便看到效果,我们编写补全服务端代码。
注意:此处实际推荐的代码在后续推荐系统中会涉及到
在toutiao-backend/common/rpc目录下新建server.py文件
import reco_pb2
import reco_pb2_grpc
import grpc
from concurrent.futures import ThreadPoolExecutor
import time
# rpc接口定义中服务对应成Python的类
class UserRecommendService(reco_pb2_grpc.UserRecommendServicer):
# 在接口定义的同名方法中补全,被调用时应该执行的逻辑
def user_recommend(self, request, context):
# request是调用的请求数据对象
user_id = request.user_id
channel_id = request.channel_id
article_num = request.article_num
time_stamp = request.time_stamp
response = reco_pb2.ArticleResponse()
response.exposure = 'exposure param'
response.time_stamp = round(time.time()*1000)
recommends = []
for i in range(article_num):
article = reco_pb2.Article()
article.track.click = 'click param {}'.format(i+1)
article.track.collect = 'collect param {}'.format(i+1)
article.track.share = 'share param {}'.format(i+1)
article.track.read = 'read param {}'.format(i+1)
article.article_id = i+1
recommends.append(article)
response.recommends.extend(recommends)
# 最终要返回一个调用结果
return response
def serve():
"""
rpc服务端启动方法
"""
# 创建一个rpc服务器
server = grpc.server(ThreadPoolExecutor(max_workers=10))
# 向服务器中添加被调用的服务方法
reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendService(), server)
# 微服务器绑定ip地址和端口
server.add_insecure_port('127.0.0.1:8888')
# 启动rpc服务
server.start()
# start()不会阻塞,此处需要加上循环睡眠 防止程序退出
while True:
time.sleep(10)
if __name__ == '__main__':
serve()