Using pool.join() in Python

The pool.join() is a function often used in multiprocessing when running parallel processes in Python. In this article, we will see how to use pool.join() when running parallel processes using the class: multiprocessing.pool.Pool([processes, …).

In particular, we will cover the following:

  1. Using pool.join(),
  2. Discuss an alternative to using pool.join(), and lastly,
  3. Work on an example demonstrating how to implement parallel processing.

The processes argument in multiprocessing.pool.Pool([processes…) is the number of processors to use. If this argument is not supplied, all the CPU cores available in your system will be used. Let’s work on an example.

# defining compute_selling_price function
def compute_selling_price(market_price, discount_rate, waiting_time):
    """
    Arguments: market_price, discount_rate, waiting_time (issue waiting time explicitly
    for demonstration purposes only).
    Returns  the selling price after applying a discount on the market_process
    """
    # Import waiting time explicitly
    print(f"Waiting for {waiting_time} seconds")
    time.sleep(waiting_time)
    # Compute the discount
    discount = market_price*(discount_rate/100)
    # Comput the price after applying the discount
    selling_price = market_price - discount
    return selling_price

Next, calling the compute_selling_price() function in a Pool process and timing the execution (we will keep doing this for all examples).

Note: You need to insert an “if __name__ == ‘__main__’:” guard in the script you are executing to avoid creating subprocesses recursively. This happens mainly on Windows.

from multiprocessing.pool import Pool
import time
if __name__ == "__main__":
    # Start timer
    start_time = time.time()
    # Initialize the Pool object. We did not specify the processes argument
    # and therefore all processes in the computer will be used.
    pool = Pool()
    # Submitting a job to the Pool process. Here, we pass our function and the
    # 3 arguments required by the function
    pool.apply_async(func=compute_selling_price, args=[1200, 8, 4])
    # End time
    end_time = time.time()
    # Compute the time taken to execute our code in seconds rounded to 1 decimal place.
    time_taken = round(end_time-start_time, 1)
    print(f"Execution finished in {time_taken} seconds.")

Output:

Execution finished in 0.0 seconds.
Waiting for 4 seconds

Since we are imposing sleeping time of 4 seconds, we expect the execution of our code to take at least 4 seconds. But it took 0 seconds.

That is because the pool.apply_async() method does not wait for the processes to exit. Once the jobs are submitted (even before the executions are complete), the rest of the code below the pool.apply_async() is executed. That is why it took 0 seconds to run our code.

To wait for the execution of the processes to be completed, we need to use pool.join().

Using pool.join()

The pool.join() method waits for the worker processes to exit. The pool.close() or pool.terminate() method must be called before using pool.join().

The pool.close() prevents more jobs from being added to the pool. Worker processes will exit once all submitted tasks are completed when this method is used. On the other hand, pool.terminate() stops the operations being executed.

Let’s call the compute_selling_price() functions we created above and use the pool.join() and pool.close() methods to wait and stop processes accordingly.

if __name__ == "__main__":
    # Start timer
    start_time = time.time()
    # Initialize the Pool object. We did not specify the processes argument
    # and therefore all processes in my computer will be used.
    pool = Pool()
    # Submit the job to the Pool process. Here, we pass our function and the
    # 3 arguments required by the function
    pool.apply_async(func=compute_selling_price, args=[1200, 8, 4])
    # Stop more tasks from being added to the pool
    pool.close()
    # Use pool.join() to wait for the processes to finish
    pool.join()
    # End time
    end_time = time.time()
    # Compute time taken to execute our code in seconds rounded to 1 decimal place.
    time_taken = round(end_time-start_time, 1)
    print(f"Execution finished in {time_taken} seconds.")

Output:

Waiting for 4 seconds
Execution finished in 4.0 seconds.

This time, after using the pool.close() and pool.join(), it took the expected time of 4 seconds to execute our code.

Using get() function

This is an alternative method to pool.join(). The get(<timeout>) function can be used on the pool object to access the value(s) returned by the function we called.

The function returns the result when it arrives. If the timeout is not given, the get() function will wait until the returned values are received; otherwise, TimeoutError is raised after timeout seconds.

You can use the get() function by inserting the following contents between the line start_time = time.time() and end_time = time.time() on the previous example.

pool = Pool()
result = pool.apply_async(func=compute_selling_price, args=[1200, 8, 4])
print(result.get())

Output:

Waiting for 4 seconds
1104.0
Execution finished in 4.0 seconds.

Another Example – The Power of Parallel Processing

To appreciate the power of parallel processing using the multiprocessing module, let’s call compute_selling_price() 5 times with and without parallel processing.

Without parallel processing

import time
# Start timer
start_time =  time.time()
# Initialize the Pool object. We did not specify the processes argument
amounts = [(1200, 8, 3), (1500, 9, 4), (2000, 10, 5), (1000, 7, 2), (1950, 9.2, 3.5)]
for amount in amounts:
	market_price, discount_rate, waiting_time =  amount
	selling_price = compute_selling_price(market_price, discount_rate, waiting_time)
	print("Selling Price: ",  selling_price)
# End time
end_time = time.time()
# Compute time taken to execute our code in seconds rounded to 1 decimal place.
time_taken = round(end_time-start_time, 1)
print(f"Execution finished in {time_taken} seconds.")

Output (truncated):

Waiting for 3 seconds
Selling Price:  1104.0
…
Waiting for 3.5 seconds
Selling Price:  1770.6
Execution finished in 17.5 seconds.

Without parallel computing, it took 17.5 seconds to calculate discounts using the compute_selling_price (because of 3+4+5+2+3.5=17.5 seconds of waiting time). In the next example, let’s call the same function 5 times, distributing the tasks to all our processors.

With parallel processing

import time
from multiprocessing.pool import Pool
if __name__ == "__main__":
	# Start timer
	start_time =  time.time()
	# Initialize the Pool object. We did not specify the processes argument
	# and therefore all processers in my computer will be used.
	pool = Pool()
	# Submit ajob to the Pool process. Here, we pass our function and the
	# 3 arguments required by the function into pool.starmap_async() method
	amounts = [(1200, 8, 3), (1500, 9, 4), (2000, 10, 5), (1000, 7, 2), (1950, 9.2, 3.5)]
	# call compute_selling_price  using pool.starmap_async that takes arguments as an iterable
	results = pool.starmap_async(compute_selling_price, amounts)
	# print the results - a list
	print(results.get())
	end_time = time.time()
	# Compute the time taken to execute our code in seconds rounded to 1 decimal place.
	time_taken = round(end_time-start_time, 1)
	print(f"Execution finished in {time_taken} seconds.")

Output (truncated):

Waiting for 3 seconds
…
Waiting for 3.5 seconds
[1104.0, 1365.0, 1800.0, 930.0, 1770.6]
Execution finished in 5.0 seconds.

The execution time is reduced through parallel computing to 5 seconds (the longest waiting time). You can save computational time if you have several values to process.

References:

  1. multiprocessing.pool.Pool: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
  2. pool.join() : https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.join