I ran some test code as below to check the performance of using Pool and Process in Linux. I’m using Python 2.7. The source code of multiprocessing.Pool seems showing it’s using multiprocessing.Process. However, multiprocessing.Pool cost much time and mem than equal # of multiprocessing.Process, and I don’t get this.
Here is what I did:
Create a large dict and then subprocesses.
Pass the dict to each subprocess for read-only.
Each subprocess do some computation and return a small result.
Below is the my testing code:
from multiprocessing import Pool, Process, Queue import time, psutil, os, gc gct = time.time costTime = lambda ET: time.strftime('%H:%M:%S', time.gmtime(int(ET))) def getMemConsumption(): procId = os.getpid() proc = psutil.Process(procId) mem = proc.memory_info().rss return "process ID %d.nMemory usage: %.6f GB" % (procId, mem*1.0/1024**3) def f_pool(l, n, jobID): try: result = {} # example of subprocess work for i in xrange(n): result[i] = l[i] # work done # gc.collect() print getMemConsumption() return 1, result, jobID except: return 0, {}, jobID def f_proc(q, l, n, jobID): try: result = {} # example of subprocess work for i in xrange(n): result[i] = l[i] # work done print getMemConsumption() q.put([1, result, jobID]) except: q.put([0, {}, jobID]) def initialSubProc(targetFunc, procArgs, jobID): outQueue = Queue() args = [outQueue] args.extend(procArgs) args.append(jobID) p = Process(target = targetFunc, args = tuple(args)) p.start() return p, outQueue def track_add_Proc(procList, outQueueList, maxProcN, jobCount, maxJobs, targetFunc, procArgs, joinFlag, all_result): if len(procList) < maxProcN: p, q = initialSubProc(targetFunc, procArgs, jobCount) outQueueList.append(q) procList.append(p) jobCount += 1 joinFlag.append(0) else: for i in xrange(len(procList)): if not procList[i].is_alive() and joinFlag[i] == 0: procList[i].join() all_results.append(outQueueList[i].get()) joinFlag[i] = 1 # in case of duplicating result of joined subprocess if jobCount < maxJobs: p, q = initialSubProc(targetFunc, procArgs, jobCount) procList[i] = p outQueueList[i] = q jobCount += 1 joinFlag[i] = 0 return jobCount if __name__ == '__main__': st = gct() d = {i:i**2 for i in xrange(10000000)} print "MainProcess create data dictn%s" % getMemConsumption() print 'Time to create dict: %snn' % costTime(gct()-st) nproc = 2 jobs = 8 subProcReturnDictLen = 1000 procArgs = [d, subProcReturnDictLen] print "Use multiprocessing.Pool, max subprocess = %d, jobs = %dn" % (nproc, jobs) st = gct() pool = Pool(processes = nproc) for i in xrange(jobs): procArgs.append(i) sp = pool.apply_async(f_pool, tuple(procArgs)) procArgs.pop(2) res = sp.get() if res[0] == 1: # do something with the result pass else: # do something with subprocess exception handle pass pool.close() pool.join() print "Total time used to finish all jobs: %s" % costTime(gct()-st) print "Main Processn", getMemConsumption(), 'n' print "Use multiprocessing.Process, max subprocess = %d, jobs = %dn" % (nproc, jobs) st = gct() procList = [] outQueueList = [] all_results = [] jobCount = 0 joinFlag = [] while (jobCount < jobs): jobCount = track_add_Proc(procList, outQueueList, nproc, jobCount, jobs, f_proc, procArgs, joinFlag, all_results) for i in xrange(nproc): if joinFlag[i] == 0: procList[i].join() all_results.append(outQueueList[i].get()) joinFlag[i] = 1 for i in xrange(jobs): res = all_results[i] if res[0] == 1: # do something with the result pass else: # do something with subprocess exception handle pass print "Total time used to finish all jobs: %s" % costTime(gct()-st) print "Main Processn", getMemConsumption()
Here is the result:
MainProcess create data dict process ID 21256. Memory usage: 0.841743 GB Time to create dict: 00:00:02 Use multiprocessing.Pool, max subprocess = 2, jobs = 8 process ID 21266. Memory usage: 1.673084 GB process ID 21267. Memory usage: 1.673088 GB process ID 21266. Memory usage: 2.131172 GB process ID 21267. Memory usage: 2.131172 GB process ID 21266. Memory usage: 2.176079 GB process ID 21267. Memory usage: 2.176083 GB process ID 21266. Memory usage: 2.176079 GB process ID 21267. Memory usage: 2.176083 GB Total time used to finish all jobs: 00:00:49 Main Process process ID 21256. Memory usage: 0.843079 GB Use multiprocessing.Process, max subprocess = 2, jobs = 8 process ID 23405. Memory usage: 0.840614 GB process ID 23408. Memory usage: 0.840618 GB process ID 23410. Memory usage: 0.840706 GB process ID 23412. Memory usage: 0.840805 GB process ID 23415. Memory usage: 0.840900 GB process ID 23417. Memory usage: 0.840973 GB process ID 23419. Memory usage: 0.841061 GB process ID 23421. Memory usage: 0.841152 GB Total time used to finish all jobs: 00:00:00 Main Process process ID 21256. Memory usage: 0.843781 GB
I don’t know why subprocesses from multiprocessing.Pool need about 1.6GB in the beginning, but subprocess from multiprocessing.Process only needs 0.84 GBs which equals the memory cost of the main process. It seems to me that only multiprocessing.Process enjoys the “copy-on-write” benefit of linux, as the time for all jobs needed is less than 1s. I don’t know why multiprocessing.Pool does not enjoy this. From the source code, multiprocessing.Pool seems like a wrapper of multiprocessing.Process.
Advertisement
Answer
Question: I don’t know why subprocesses from multiprocessing.Pool need about 1.6GB in the beginning,
… Pool seems like a wrapper of multiprocessing.Process
This is, as Pool
reserve memory for the results for all jobs.
Second, Pool
uses two SimpleQueue()
and three Threads
.
Third, Pool
duplicate all passed argv
data before passing up to a process
.
Your process
example use only one Queue()
for all, passing argv
as they are.
Pool
is far away to be only a wrapper.