Skip to content
Advertisement

Why multiprocessing.Pool and multiprocessing.Process perform so differently in Linux

I ran some test code as below to check the performance of using Pool and Process in Linux. I’m using Python 2.7. The source code of multiprocessing.Pool seems showing it’s using multiprocessing.Process. However, multiprocessing.Pool cost much time and mem than equal # of multiprocessing.Process, and I don’t get this.

Here is what I did:

  1. Create a large dict and then subprocesses.

  2. Pass the dict to each subprocess for read-only.

  3. Each subprocess do some computation and return a small result.

Below is the my testing code:

from multiprocessing import Pool, Process, Queue
import time, psutil, os, gc

gct = time.time
costTime = lambda ET: time.strftime('%H:%M:%S', time.gmtime(int(ET)))

def getMemConsumption():
    procId = os.getpid()
    proc = psutil.Process(procId)
    mem = proc.memory_info().rss
    return "process ID %d.nMemory usage: %.6f GB" % (procId, mem*1.0/1024**3)

def f_pool(l, n, jobID):
    try:
        result = {}
        # example of subprocess work
        for i in xrange(n):
            result[i] = l[i]
        # work done
        # gc.collect()
        print getMemConsumption()
        return 1, result, jobID
    except:
        return 0, {}, jobID

def f_proc(q, l, n, jobID):
    try:
        result = {}
        # example of subprocess work
        for i in xrange(n):
            result[i] = l[i]
        # work done
        print getMemConsumption()
        q.put([1, result, jobID])
    except:
        q.put([0, {}, jobID])

def initialSubProc(targetFunc, procArgs, jobID):
    outQueue = Queue()
    args = [outQueue]
    args.extend(procArgs)
    args.append(jobID)
    p = Process(target = targetFunc, args = tuple(args))
    p.start()
    return p, outQueue


def track_add_Proc(procList, outQueueList, maxProcN, jobCount, 
                   maxJobs, targetFunc, procArgs, joinFlag, all_result):
    if len(procList) < maxProcN:
        p, q = initialSubProc(targetFunc, procArgs, jobCount)
        outQueueList.append(q)
        procList.append(p)
        jobCount += 1
        joinFlag.append(0)
    else:
        for i in xrange(len(procList)):
            if not procList[i].is_alive() and joinFlag[i] == 0:
                procList[i].join()
                all_results.append(outQueueList[i].get())
                joinFlag[i] = 1 # in case of duplicating result of joined subprocess
                if jobCount < maxJobs:
                    p, q = initialSubProc(targetFunc, procArgs, jobCount)
                    procList[i] = p
                    outQueueList[i] = q
                    jobCount += 1
                    joinFlag[i] = 0
    return jobCount

if __name__ == '__main__':
    st = gct()
    d = {i:i**2 for i in xrange(10000000)}
    print "MainProcess create data dictn%s" % getMemConsumption()
    print 'Time to create dict: %snn' % costTime(gct()-st)

    nproc = 2
    jobs = 8
    subProcReturnDictLen = 1000
    procArgs = [d, subProcReturnDictLen]

    print "Use multiprocessing.Pool, max subprocess = %d, jobs = %dn" % (nproc, jobs)
    st = gct()
    pool = Pool(processes = nproc)
    for i in xrange(jobs):
        procArgs.append(i)
        sp = pool.apply_async(f_pool, tuple(procArgs))
        procArgs.pop(2)
        res = sp.get()
        if res[0] == 1:
            # do something with the result
            pass
        else:
            # do something with subprocess exception handle
            pass
    pool.close()
    pool.join()
    print "Total time used to finish all jobs: %s" % costTime(gct()-st)
    print "Main Processn", getMemConsumption(), 'n'

    print "Use multiprocessing.Process, max subprocess = %d, jobs = %dn" % (nproc, jobs)
    st = gct()
    procList = []
    outQueueList = []
    all_results = []
    jobCount = 0
    joinFlag = []
    while (jobCount < jobs):
        jobCount = track_add_Proc(procList, outQueueList, nproc, jobCount, 
                                  jobs, f_proc, procArgs, joinFlag, all_results)
    for i in xrange(nproc):
        if joinFlag[i] == 0:
            procList[i].join()
            all_results.append(outQueueList[i].get())
            joinFlag[i] = 1
    for i in xrange(jobs):
        res = all_results[i]
        if res[0] == 1:
            # do something with the result
            pass
        else:
            # do something with subprocess exception handle
            pass
    print "Total time used to finish all jobs: %s" % costTime(gct()-st)
    print "Main Processn", getMemConsumption()

Here is the result:

MainProcess create data dict
process ID 21256.
Memory usage: 0.841743 GB
Time to create dict: 00:00:02


Use multiprocessing.Pool, max subprocess = 2, jobs = 8

process ID 21266.
Memory usage: 1.673084 GB
process ID 21267.
Memory usage: 1.673088 GB
process ID 21266.
Memory usage: 2.131172 GB
process ID 21267.
Memory usage: 2.131172 GB
process ID 21266.
Memory usage: 2.176079 GB
process ID 21267.
Memory usage: 2.176083 GB
process ID 21266.
Memory usage: 2.176079 GB
process ID 21267.
Memory usage: 2.176083 GB

Total time used to finish all jobs: 00:00:49
Main Process
process ID 21256.
Memory usage: 0.843079 GB 


Use multiprocessing.Process, max subprocess = 2, jobs = 8

process ID 23405.
Memory usage: 0.840614 GB
process ID 23408.
Memory usage: 0.840618 GB
process ID 23410.
Memory usage: 0.840706 GB
process ID 23412.
Memory usage: 0.840805 GB
process ID 23415.
Memory usage: 0.840900 GB
process ID 23417.
Memory usage: 0.840973 GB
process ID 23419.
Memory usage: 0.841061 GB
process ID 23421.
Memory usage: 0.841152 GB

Total time used to finish all jobs: 00:00:00
Main Process
process ID 21256.
Memory usage: 0.843781 GB

I don’t know why subprocesses from multiprocessing.Pool need about 1.6GB in the beginning, but subprocess from multiprocessing.Process only needs 0.84 GBs which equals the memory cost of the main process. It seems to me that only multiprocessing.Process enjoys the “copy-on-write” benefit of linux, as the time for all jobs needed is less than 1s. I don’t know why multiprocessing.Pool does not enjoy this. From the source code, multiprocessing.Pool seems like a wrapper of multiprocessing.Process.

Advertisement

Answer

Question: I don’t know why subprocesses from multiprocessing.Pool need about 1.6GB in the beginning,
… Pool seems like a wrapper of multiprocessing.Process

This is, as Pool reserve memory for the results for all jobs.
Second, Pool uses two SimpleQueue() and three Threads.
Third, Pool duplicate all passed argv data before passing up to a process.

Your process example use only one Queue() for all, passing argv as they are.

Pool is far away to be only a wrapper.

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement