當前位置:成語大全網 - 新華字典 - 如何在Python中編寫並發程序

如何在Python中編寫並發程序

GIL

在Python中,由於歷史原因(GIL),使得Python中多線程的效果非常不理想.GIL使得任何時刻Python只能利用壹個CPU核,並

且它的調度算法簡單粗暴:多線程中,讓每個線程運行壹段時間t,然後強行掛起該線程,繼而去運行其他線程,如此周而復始,直到所有線程結束.

這使得無法有效利用計算機系統中的"局部性",頻繁的線程切換也對緩存不是很友好,造成資源的浪費.

據說Python官方曾經實現了壹個去除GIL的Python解釋器,但是其效果還不如有GIL的解釋器,遂放棄.後來Python官方推出了"利

用多進程替代多線程"的方案,在Python3中也有concurrent.futures這樣的包,讓我們的程序編寫可以做到"簡單和性能兼得".

多進程/多線程+Queue

壹般來說,在Python中編寫並發程序的經驗是:計算密集型任務使用多進程,IO密集型任務使用多進程或者多線程.另外,因為涉及到資源***享,所

以需要同步鎖等壹系列麻煩的步驟,代碼編寫不直觀.另外壹種好的思路是利用多進程/多線程+Queue的方法,可以避免加鎖這樣麻煩低效的方式.

現在在Python2中利用Queue+多進程的方法來處理壹個IO密集型任務.

假設現在需要下載多個網頁內容並進行解析,單進程的方式效率很低,所以使用多進程/多線程勢在必行.

我們可以先初始化壹個tasks隊列,裏面將要存儲的是壹系列dest_url,同時開啟4個進程向tasks中取任務然後執行,處理結果存儲在壹個results隊列中,最後對results中的結果進行解析.最後關閉兩個隊列.

下面是壹些主要的邏輯代碼.

# -*- coding:utf-8 -*-

#IO密集型任務

#多個進程同時下載多個網頁

#利用Queue+多進程

#由於是IO密集型,所以同樣可以利用threading模塊

import multiprocessing

def main():

tasks = multiprocessing.JoinableQueue()

results = multiprocessing.Queue()

cpu_count = multiprocessing.cpu_count() #進程數目==CPU核數目

create_process(tasks, results, cpu_count) #主進程馬上創建壹系列進程,但是由於阻塞隊列tasks開始為空,副進程全部被阻塞

add_tasks(tasks) #開始往tasks中添加任務

parse(tasks, results) #最後主進程等待其他線程處理完成結果

def create_process(tasks, results, cpu_count):

for _ in range(cpu_count):

p = multiprocessing.Process(target=_worker, args=(tasks, results)) #根據_worker創建對應的進程

p.daemon = True #讓所有進程可以隨主進程結束而結束

p.start() #啟動

def _worker(tasks, results):

while True: #因為前面所有線程都設置了daemon=True,故不會無限循環

try:

task = tasks.get() #如果tasks中沒有任務,則阻塞

result = _download(task)

results.put(result) #some exceptions do not handled

finally:

tasks.task_done()

def add_tasks(tasks):

for url in get_urls(): #get_urls() return a urls_list

tasks.put(url)

def parse(tasks, results):

try:

tasks.join()

except KeyboardInterrupt as err:

print "Tasks has been stopped!"

print err

while not results.empty():

_parse(results)

if __name__ == '__main__':

main()

利用Python3中的concurrent.futures包

在Python3中可以利用concurrent.futures包,編寫更加簡單易用的多線程/多進程代碼.其使用感覺和Java的concurrent框架很相似(借鑒?)

比如下面的簡單代碼示例

def handler():

futures = set()

with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:

for task in get_task(tasks):

future = executor.submit(task)

futures.add(future)

def wait_for(futures):

try:

for future in concurrent.futures.as_completed(futures):

err = futures.exception()

if not err:

result = future.result()

else:

raise err

except KeyboardInterrupt as e:

for future in futures:

future.cancel()

print "Task has been canceled!"

print e

return result

總結

要是壹些大型Python項目也這般編寫,那麽效率也太低了.在Python中有許多已有的框架使用,使用它們起來更加高效.