python 多进程,程序运行越来越慢踩坑
我是java系的,会点儿go,因为项目的原因,开始接触python,赶鸭子上架,也接触了python不少内容,其中就包括多线程和多进程,我介绍和记录一些我的实践,方便自己的同时,希望能帮助到其它的小伙伴。
我先介绍一下我的应用场景,我需要用python解析大量的excel数据,20-30万行数据,然后构造一个请求的地址,用http调用,返回结果,连同原始数据一起生成excel。在生成excel的时候,为了保持原始数据顺序性,我会先把所有的请求结果用map存储在内存中,map的key为excel顺序号,value为url的结果。然后我会再次解析excel,从map中获取结果,进而生成excel。
为什么要研究多线程或者多进程呢,因为数据比较多,我需要用多进程或者多线程并发访问url。
在研究python多线程的时候,我发现因为GIL的原因,不管机器有多少个核,python只能用其中一个,如果想用上多个核的话,需要用python多进程。python多线程适合用做io密集型的任务,多进程适合用做cpu密集型的任务。我的场景是请求url,是一个io密集型的操作,按推荐来讲,我应该使用多线程,但我不确定GIL会在多大程度上影响我,在我遇到的业务场景中,也很少使用多进程的方式(除了跨主机多进程) 所以我打算用把多进程尝尝鲜。
在进程间数据共享的方式上,我选择用共享内存的方式(后来证明这是一个不太好的方式)。
我是这样使用多进程的
# 多进程核心代码
# 读取excel
excel_data = pd.read_excel(path, sheet_name)
# 共享内存
smm = Manager()
response_map = smm.dict()
# 用来存储生成excel的数据
data = {}
# 并发调用http接口
http_pool = multiprocessing.Pool(processes=2)
for index, row in excel_data.iterrows():
doc_id = row['index']
address = row['address']
http_pool.apply_async(process_task, args=(doc_id, address, response_map))
http_pool.close()
http_pool.join()
# 多进程请求url
def process_task(doc_id, address, response_map):
# 请求地址
response = http_call(address)
if doc_id not in response_map.keys():
response_map[doc_id] = response
else:
pass
首先说多进程的好处
1. 进程之间天然是隔离的,不需要用锁来控制各个请求,我从来没发现请求有驴唇不对马嘴的情况。
其次就是多进程的坏处
1. 程序运行起来之后,我发现一个现象,随着时间的推移,程序运行的越来越慢。我仔细思考了一下,我觉得是因为进程间共享数据比较消耗资源,随着response_map中数据越来越多,程序肉眼可见的变慢了。
2. 程序长时间运行的时候报错MemoryError,但因为报错日志没有保存下来,具体错误原因不详,但我隐约感觉可能跟多进程共享大量数据有关系。
之后我果断转向多线程
我是这样使用多线程的
# 读取excel数据
excel_data = pd.read_excel(path, sheet_name).astype(str)
# 请求统计
counter = {
"total": 0,
"repeat_num": 0,
"empty_num": 0,
"error_num": 0
}
# 并发调用http接口
futures = {}
response_map = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=process) as executor:
for index, row in excel_data.iterrows():
doc_id = row['index']
address = row['address']
# 发送请求
future = executor.submit(process_task, doc_id, address, response_map, index, counter)
futures[future] = doc_id
for future in concurrent.futures.as_completed(futures):
pass
# 多线程执行代码
def process_task(doc_id, ec_addr, address, response_map, index, counter):
try:
with lock:
counter['total'] += 1
if ec_addr == '':
counter['empty_num'] += 1
return
if doc_id in response_map.keys():
counter['repeat_num'] += 1
return
response = http_call(address, index)
with lock:
response_map[doc_id] = response
except Exception:
with lock:
counter['error_num'] += 1
使用多线程之后,程序越运行越慢的问题解决了,MemoryError的错误在也没有见过。副作用是需要仔细lock好共享数据,不然会有请求和结果对不上的情况。
感谢python,让我对多进程以及进程之间通过内存共享数据有了直观的感受。