I ran some test code as below to check the performance of using Pool and Process in Linux. I’m using Python 2.7. The source code of multiprocessing.Pool seems showing it’s using multiprocessing.Process. However, multiprocessing.Pool cost much time and mem than equal # of multiprocessing.Process, and I don’t get this.
Here is what I did:
Create a large dict and then subprocesses.
Pass the dict to each subprocess for read-only.
Each subprocess do some computation and return a small result.
Below is the my testing code:
from multiprocessing import Pool, Process, Queue
import time, psutil, os, gc
gct = time.time
costTime = lambda ET: time.strftime('%H:%M:%S', time.gmtime(int(ET)))
def getMemConsumption():
procId = os.getpid()
proc = psutil.Process(procId)
mem = proc.memory_info().rss
return "process ID %d.nMemory usage: %.6f GB" % (procId, mem*1.0/1024**3)
def f_pool(l, n, jobID):
try:
result = {}
# example of subprocess work
for i in xrange(n):
result[i] = l[i]
# work done
# gc.collect()
print getMemConsumption()
return 1, result, jobID
except:
return 0, {}, jobID
def f_proc(q, l, n, jobID):
try:
result = {}
# example of subprocess work
for i in xrange(n):
result[i] = l[i]
# work done
print getMemConsumption()
q.put([1, result, jobID])
except:
q.put([0, {}, jobID])
def initialSubProc(targetFunc, procArgs, jobID):
outQueue = Queue()
args = [outQueue]
args.extend(procArgs)
args.append(jobID)
p = Process(target = targetFunc, args = tuple(args))
p.start()
return p, outQueue
def track_add_Proc(procList, outQueueList, maxProcN, jobCount,
maxJobs, targetFunc, procArgs, joinFlag, all_result):
if len(procList) < maxProcN:
p, q = initialSubProc(targetFunc, procArgs, jobCount)
outQueueList.append(q)
procList.append(p)
jobCount += 1
joinFlag.append(0)
else:
for i in xrange(len(procList)):
if not procList[i].is_alive() and joinFlag[i] == 0:
procList[i].join()
all_results.append(outQueueList[i].get())
joinFlag[i] = 1 # in case of duplicating result of joined subprocess
if jobCount < maxJobs:
p, q = initialSubProc(targetFunc, procArgs, jobCount)
procList[i] = p
outQueueList[i] = q
jobCount += 1
joinFlag[i] = 0
return jobCount
if __name__ == '__main__':
st = gct()
d = {i:i**2 for i in xrange(10000000)}
print "MainProcess create data dictn%s" % getMemConsumption()
print 'Time to create dict: %snn' % costTime(gct()-st)
nproc = 2
jobs = 8
subProcReturnDictLen = 1000
procArgs = [d, subProcReturnDictLen]
print "Use multiprocessing.Pool, max subprocess = %d, jobs = %dn" % (nproc, jobs)
st = gct()
pool = Pool(processes = nproc)
for i in xrange(jobs):
procArgs.append(i)
sp = pool.apply_async(f_pool, tuple(procArgs))
procArgs.pop(2)
res = sp.get()
if res[0] == 1:
# do something with the result
pass
else:
# do something with subprocess exception handle
pass
pool.close()
pool.join()
print "Total time used to finish all jobs: %s" % costTime(gct()-st)
print "Main Processn", getMemConsumption(), 'n'
print "Use multiprocessing.Process, max subprocess = %d, jobs = %dn" % (nproc, jobs)
st = gct()
procList = []
outQueueList = []
all_results = []
jobCount = 0
joinFlag = []
while (jobCount < jobs):
jobCount = track_add_Proc(procList, outQueueList, nproc, jobCount,
jobs, f_proc, procArgs, joinFlag, all_results)
for i in xrange(nproc):
if joinFlag[i] == 0:
procList[i].join()
all_results.append(outQueueList[i].get())
joinFlag[i] = 1
for i in xrange(jobs):
res = all_results[i]
if res[0] == 1:
# do something with the result
pass
else:
# do something with subprocess exception handle
pass
print "Total time used to finish all jobs: %s" % costTime(gct()-st)
print "Main Processn", getMemConsumption()
Here is the result:
MainProcess create data dict process ID 21256. Memory usage: 0.841743 GB Time to create dict: 00:00:02 Use multiprocessing.Pool, max subprocess = 2, jobs = 8 process ID 21266. Memory usage: 1.673084 GB process ID 21267. Memory usage: 1.673088 GB process ID 21266. Memory usage: 2.131172 GB process ID 21267. Memory usage: 2.131172 GB process ID 21266. Memory usage: 2.176079 GB process ID 21267. Memory usage: 2.176083 GB process ID 21266. Memory usage: 2.176079 GB process ID 21267. Memory usage: 2.176083 GB Total time used to finish all jobs: 00:00:49 Main Process process ID 21256. Memory usage: 0.843079 GB Use multiprocessing.Process, max subprocess = 2, jobs = 8 process ID 23405. Memory usage: 0.840614 GB process ID 23408. Memory usage: 0.840618 GB process ID 23410. Memory usage: 0.840706 GB process ID 23412. Memory usage: 0.840805 GB process ID 23415. Memory usage: 0.840900 GB process ID 23417. Memory usage: 0.840973 GB process ID 23419. Memory usage: 0.841061 GB process ID 23421. Memory usage: 0.841152 GB Total time used to finish all jobs: 00:00:00 Main Process process ID 21256. Memory usage: 0.843781 GB
I don’t know why subprocesses from multiprocessing.Pool need about 1.6GB in the beginning, but subprocess from multiprocessing.Process only needs 0.84 GBs which equals the memory cost of the main process. It seems to me that only multiprocessing.Process enjoys the “copy-on-write” benefit of linux, as the time for all jobs needed is less than 1s. I don’t know why multiprocessing.Pool does not enjoy this. From the source code, multiprocessing.Pool seems like a wrapper of multiprocessing.Process.
Advertisement
Answer
Question: I don’t know why subprocesses from multiprocessing.Pool need about 1.6GB in the beginning,
… Pool seems like a wrapper of multiprocessing.Process
This is, as Pool reserve memory for the results for all jobs.
Second, Pool uses two SimpleQueue() and three Threads.
Third, Pool duplicate all passed argv data before passing up to a process.
Your process example use only one Queue() for all, passing argv as they are.
Pool is far away to be only a wrapper.