This is in Linux, Python 3.8.
I use ProcessPoolExecutor
to speed up the processing of a list of large dataframes, but because they all get copied in each process, I run out of memory. How do I solve this problem?
My code looks like this:
def some_func(df): # do some work on a single pandas DataFrame return single_df # returns a single pandas DataFrame # dfs is a list of 100 dataframes with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: dfs = list(executor.map(some_func, dfs) # here entire dfs gets copied 4 times? executor.shutdown(wait=True)
I want to minimize the unnecessary copying of data, i.e. minimize my memory footprint. What’s a good solution?
Advertisement
Answer
It appears that your memory problem relates to requiring enough storage to hold double or 200 dataframes, assuming that some_func
does indeed implicitly return None
. That is, your main process is storing the initial 100 dataframes and then it is copying these dataframes to a task queue. But as a dataframe is removed from the task queue, it will temporarily occupy storage in the address space of a pool process until the process is through with it and pulls off the next dataframe from the pool. This results in memory utilization decreasing as the processing pool is depleting tasks from the task queue until we are back to where we started (more or less). But the high-water mark will be around 200 dataframes assuming that these dataframes can be added to the queue much faster than your worker function, some_func
, can process a dataframe. If, however, some_func
in reality returns the dataframe, then the memory usage will not decrease as tasks complete.
The simplest thing to try is to not submit all 100 dataframes to your pool at a time, but rather split your list into chunks of 4 (your pool size), so that your high-water mark of memory utilization should be around 104 dataframes, again assuming that some_func
is not returning a dataframe (and is instead, for example, just writing it out).
def some_func(df): # do some work on a single pandas DataFrame pass # dfs is a list of 100 dataframes with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: start = 0 cnt = len(dfs) while start < cnt: # Do chunks of 4: list(executor.map(some_func, dfs[start:start+4])) start += 4 #executor.shutdown(wait=True) is done implicitly at end of the above block
Update
Since we now know that some_func
actually does return a dataframe, I am assuming that you do not need the original list of dataframes when you are all through. If that assumption is wrong, then I see no way that you would not need storage for 200 dataframes, right?
So now we still submit out tasks in chunks of 4 and replace our input dataframes with the result dataframes:
def some_func(df): # do some work on a single pandas DataFrame return single_df # returns a single pandas DataFrame # dfs is a list of 100 dataframes with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: start = 0 cnt = len(dfs) while start < cnt: # Do chunks of 4: dfs[start:start+4] = list(executor.map(some_func, dfs[start:start+4])) start += 4 #executor.shutdown(wait=True) is done implicitly at end of the above block