The pool.join() is a function often used in multiprocessing when running parallel processes in Python. In this article, we will see how to use pool.join() when running parallel processes using the class: multiprocessing.pool.Pool([processes, …).
In particular, we will cover the following:
- Using pool.join(),
- Discuss an alternative to using pool.join(), and lastly,
- Work on an example demonstrating how to implement parallel processing.
The processes argument in multiprocessing.pool.Pool([processes…) is the number of processors to use. If this argument is not supplied, all the CPU cores available in your system will be used. Let’s work on an example.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# defining compute_selling_price function def compute_selling_price(market_price, discount_rate, waiting_time): """ Arguments: market_price, discount_rate, waiting_time (issue waiting time explicitly for demonstration purposes only). Returns the selling price after applying a discount on the market_process """ # Import waiting time explicitly print(f"Waiting for {waiting_time} seconds") time.sleep(waiting_time) # Compute the discount discount = market_price*(discount_rate/100) # Comput the price after applying the discount selling_price = market_price - discount return selling_price |
Next, calling the compute_selling_price() function in a Pool process and timing the execution (we will keep doing this for all examples).
Note: You need to insert an “if __name__ == ‘__main__’:” guard in the script you are executing to avoid creating subprocesses recursively. This happens mainly on Windows.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
from multiprocessing.pool import Pool import time if __name__ == "__main__": # Start timer start_time = time.time() # Initialize the Pool object. We did not specify the processes argument # and therefore all processes in the computer will be used. pool = Pool() # Submitting a job to the Pool process. Here, we pass our function and the # 3 arguments required by the function pool.apply_async(func=compute_selling_price, args=[1200, 8, 4]) # End time end_time = time.time() # Compute the time taken to execute our code in seconds rounded to 1 decimal place. time_taken = round(end_time-start_time, 1) print(f"Execution finished in {time_taken} seconds.") |
Output:
Execution finished in 0.0 seconds. Waiting for 4 seconds
Since we are imposing a sleeping time of 4 seconds, we expect the execution of our code to take at least 4 seconds. But it took 0 seconds.
That is because of the pool.apply_async() method does not wait for the processes to exit. Once the jobs are submitted (even before the executions are complete), the rest of the code below the pool.apply_async() is executed. That is why it took 0 seconds to run our code.
To wait for the execution of the processes to be completed, we need to use pool.join().
Using pool.join()
The pool.join() method waits for the worker processes to exit. The pool.close() or pool.terminate() method must be called before using pool.join().
The pool.close() prevents more jobs from being added to the pool. Worker processes will exit once all submitted tasks are completed when this method is used. On the other hand, pool.terminate() stops the operations being executed.
Let’s call the compute_selling_price() functions we created above and use the pool.join() and pool.close() methods to wait and stop processes accordingly.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
if __name__ == "__main__": # Start timer start_time = time.time() # Initialize the Pool object. We did not specify the processes argument # and therefore all processes in my computer will be used. pool = Pool() # Submit the job to the Pool process. Here, we pass our function and the # 3 arguments required by the function pool.apply_async(func=compute_selling_price, args=[1200, 8, 4]) # Stop more tasks from being added to the pool pool.close() # Use pool.join() to wait for the processes to finish pool.join() # End time end_time = time.time() # Compute time taken to execute our code in seconds rounded to 1 decimal place. time_taken = round(end_time-start_time, 1) print(f"Execution finished in {time_taken} seconds.") |
Output:
Waiting for 4 seconds Execution finished in 4.0 seconds.
This time, after using the pool.close() and pool.join(), it took the expected time of 4 seconds to execute our code.
Using get() function
This is an alternative method to pool.join(). The get(<timeout>) function can be used on the pool object to access the value(s) returned by the function we called.
The function returns the result when it arrives. If the timeout is not given, the get() function will wait until the returned values are received; otherwise, TimeoutError is raised after timeout seconds.
You can use the get() function by inserting the following contents between the line start_time = time.time() and end_time = time.time() on the previous example.
1 2 3 |
pool = Pool() result = pool.apply_async(func=compute_selling_price, args=[1200, 8, 4]) print(result.get()) |
Output:
Waiting for 4 seconds 1104.0 Execution finished in 4.0 seconds.
Another Example – The Power of Parallel Processing
To appreciate the power of parallel processing using the multiprocessing module, let’s call compute_selling_price() 5 times with and without parallel processing.
Without parallel processing
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import time # Start timer start_time = time.time() # Initialize the Pool object. We did not specify the processes argument amounts = [(1200, 8, 3), (1500, 9, 4), (2000, 10, 5), (1000, 7, 2), (1950, 9.2, 3.5)] for amount in amounts: market_price, discount_rate, waiting_time = amount selling_price = compute_selling_price(market_price, discount_rate, waiting_time) print("Selling Price: ", selling_price) # End time end_time = time.time() # Compute time taken to execute our code in seconds rounded to 1 decimal place. time_taken = round(end_time-start_time, 1) print(f"Execution finished in {time_taken} seconds.") |
Output (truncated):
Waiting for 3 seconds Selling Price: 1104.0 … Waiting for 3.5 seconds Selling Price: 1770.6 Execution finished in 17.5 seconds.
Without parallel computing, it took 17.5 seconds to calculate discounts using the compute_selling_price (because of 3+4+5+2+3.5=17.5 seconds of waiting time). In the next example, let’s call the same function 5 times, distributing the tasks to all our processors.
With parallel processing
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import time from multiprocessing.pool import Pool if __name__ == "__main__": # Start timer start_time = time.time() # Initialize the Pool object. We did not specify the processes argument # and therefore all processers in my computer will be used. pool = Pool() # Submit ajob to the Pool process. Here, we pass our function and the # 3 arguments required by the function into pool.starmap_async() method amounts = [(1200, 8, 3), (1500, 9, 4), (2000, 10, 5), (1000, 7, 2), (1950, 9.2, 3.5)] # call compute_selling_price using pool.starmap_async that takes arguments as an iterable results = pool.starmap_async(compute_selling_price, amounts) # print the results - a list print(results.get()) end_time = time.time() # Compute the time taken to execute our code in seconds rounded to 1 decimal place. time_taken = round(end_time-start_time, 1) print(f"Execution finished in {time_taken} seconds.") |
Output (truncated):
Waiting for 3 seconds … Waiting for 3.5 seconds [1104.0, 1365.0, 1800.0, 930.0, 1770.6] Execution finished in 5.0 seconds.
The execution time is reduced through parallel computing to 5 seconds (the longest waiting time). You can save computational time if you have several values to process.
References:
- multiprocessing.pool.Pool: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
- pool.join() : https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.join