當前位置:成語大全網 - 新華字典 - python多進程中隊列不空時阻塞,求解為什麽

python多進程中隊列不空時阻塞,求解為什麽

最近接觸壹個項目,要在多個虛擬機中運行任務,參考別人之前項目的代碼,采用了多進程來處理,於是上網查了查python中的多進程

壹、先說說Queue(隊列對象)

Queue是python中的標準庫,可以直接import 引用,之前學習的時候有聽過著名的“先吃先拉”與“後吃先吐”,其實就是這裏說的隊列,隊列的構造的時候可以定義它的容量,別吃撐了,吃多了,就會報錯,構造的時候不寫或者寫個小於1的數則表示無限多

import Queue

q = Queue.Queue(10)

向隊列中放值(put)

q.put(‘yang')

q.put(4)

q.put([‘yan','xing'])

在隊列中取值get()

默認的隊列是先進先出的

>>> q.get()

‘yang'

>>> q.get()

4

>>> q.get()

[‘yan', ‘xing']

當壹個隊列為空的時候如果再用get取則會堵塞,所以取隊列的時候壹般是用到

get_nowait()方法,這種方法在向壹個空隊列取值的時候會拋壹個Empty異常

所以更常用的方法是先判斷壹個隊列是否為空,如果不為空則取值

隊列中常用的方法

Queue.qsize() 返回隊列的大小

Queue.empty() 如果隊列為空,返回True,反之False

Queue.full() 如果隊列滿了,返回True,反之False

Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間

Queue.get_nowait() 相當Queue.get(False)

非阻塞 Queue.put(item) 寫入隊列,timeout等待時間

Queue.put_nowait(item) 相當Queue.put(item, False)

二、multiprocessing中使用子進程概念

from multiprocessing import Process

可以通過Process來構造壹個子進程

p = Process(target=fun,args=(args))

再通過p.start()來啟動子進程

再通過p.join()方法來使得子進程運行結束後再執行父進程

from multiprocessing import Process

import os

# 子進程要執行的代碼

def run_proc(name):

print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':

print 'Parent process %s.' % os.getpid()

p = Process(target=run_proc, args=('test',))

print 'Process will start.'

p.start()

p.join()

print 'Process end.'

三、在multiprocessing中使用pool

如果需要多個子進程時可以考慮使用進程池(pool)來管理

from multiprocessing import Pool

from multiprocessing import Pool

import os, time

def long_time_task(name):

print 'Run task %s (%s)...' % (name, os.getpid())

start = time.time()

time.sleep(3)

end = time.time()

print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':

print 'Parent process %s.' % os.getpid()

p = Pool()

for i in range(5):

p.apply_async(long_time_task, args=(i,))

print 'Waiting for all subprocesses done...'

p.close()

p.join()

print 'All subprocesses done.'

pool創建子進程的方法與Process不同,是通過

p.apply_async(func,args=(args))實現,壹個池子裏能同時運行的任務是取決妳電腦的cpu數量,如我的電腦現在是有4個cpu,那會子進程task0,task1,task2,task3可以同時啟動,task4則在之前的壹個某個進程結束後才開始

上面的程序運行後的結果其實是按照上圖中1,2,3分開進行的,先打印1,3秒後打印2,再3秒後打印3

代碼中的p.close()是關掉進程池子,是不再向裏面添加進程了,對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之後就不能繼續添加新的Process了。

當時也可以是實例pool的時候給它定義壹個進程的多少

如果上面的代碼中p=Pool(5)那麽所有的子進程就可以同時進行

三、多個子進程間的通信

多個子進程間的通信就要采用第壹步中說到的Queue,比如有以下的需求,壹個子進程向隊列中寫數據,另外壹個進程從隊列中取數據,

#coding:gbk

from multiprocessing import Process, Queue

import os, time, random

# 寫數據進程執行的代碼:

def write(q):

for value in ['A', 'B', 'C']:

print 'Put %s to queue...' % value

q.put(value)

time.sleep(random.random())

# 讀數據進程執行的代碼:

def read(q):

while True:

if not q.empty():

value = q.get(True)

print 'Get %s from queue.' % value

time.sleep(random.random())

else:

break

if __name__=='__main__':

# 父進程創建Queue,並傳給各個子進程:

q = Queue()

pw = Process(target=write, args=(q,))

pr = Process(target=read, args=(q,))

# 啟動子進程pw,寫入:

pw.start()

# 等待pw結束:

pw.join()

# 啟動子進程pr,讀取:

pr.start()

pr.join()

# pr進程裏是死循環,無法等待其結束,只能強行終止:

print

print '所有數據都寫入並且讀完'

四、關於上面代碼的幾個有趣的問題

if __name__=='__main__':

# 父進程創建Queue,並傳給各個子進程:

q = Queue()

p = Pool()

pw = p.apply_async(write,args=(q,))

pr = p.apply_async(read,args=(q,))

p.close()

p.join()

print

print '所有數據都寫入並且讀完'

如果main函數寫成上面的樣本,本來我想要的是將會得到壹個隊列,將其作為參數傳入進程池子裏的每個子進程,但是卻得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的錯誤,查了下,大意是隊列對象不能在父進程與子進程間通信,這個如果想要使用進程池中使用隊列則要使用multiprocess的Manager類

if __name__=='__main__':

manager = multiprocessing.Manager()

# 父進程創建Queue,並傳給各個子進程:

q = manager.Queue()

p = Pool()

pw = p.apply_async(write,args=(q,))

time.sleep(0.5)

pr = p.apply_async(read,args=(q,))

p.close()

p.join()

print

print '所有數據都寫入並且讀完'

這樣這個隊列對象就可以在父進程與子進程間通信,不用池則不需要Manager,以後再擴展multiprocess中的Manager類吧

關於鎖的應用,在不同程序間如果有同時對同壹個隊列操作的時候,為了避免錯誤,可以在某個函數操作隊列的時候給它加把鎖,這樣在同壹個時間內則只能有壹個子進程對隊列進行操作,鎖也要在manager對象中的鎖

#coding:gbk

from multiprocessing import Process,Queue,Pool

import multiprocessing

import os, time, random

# 寫數據進程執行的代碼:

def write(q,lock):

lock.acquire() #加上鎖

for value in ['A', 'B', 'C']:

print 'Put %s to queue...' % value

q.put(value)

lock.release() #釋放鎖

# 讀數據進程執行的代碼:

def read(q):

while True:

if not q.empty():

value = q.get(False)

print 'Get %s from queue.' % value

time.sleep(random.random())

else:

break

if __name__=='__main__':

manager = multiprocessing.Manager()

# 父進程創建Queue,並傳給各個子進程:

q = manager.Queue()

lock = manager.Lock() #初始化壹把鎖

p = Pool()

pw = p.apply_async(write,args=(q,lock))

pr = p.apply_async(read,args=(q,))

p.close()

p.join()

print

print '所有數據都寫入並且讀完'