The number of threads that can simultaneously run within a ThreadPoolExecutor
is limited by the _max_workers
parameter. Submitting tasks whose execution is dependent on other tasks submitted to the same ThreadPoolExecutor
may result in a thread-starvation deadlock. An example of this type of deadlock is shown in the following article [Brownlee, 2021], which describes it as “Deadlock 1: Submit and Wait for a Task Within a Task”. Submitting a task will add it to an internal ThreadPoolExecutor
queue. The task will be removed from the queue when one of the worker threads becomes available, i.e., after finishing its current task. If all workers are busy and all their current tasks are waiting for results from tasks that are waiting in the queue, the program will run indefinitely as no worker will be able to complete its task and take one of the blocking tasks from the queue.
The noncompliant01.py
code example shows how a thread-starvation deadlock could happen when one task depends on the results of other, nested tasks. The ReportTableGenerator
class creates a table out of the input data by building each row concurrently. The client can call the generate_string_table()
method by providing a list of String arguments. Each argument is parsed in a separate thread that executes the inner method _create_table_row()
. Creation of the row may consist of multiple steps, such as reformating the string, which could be performed in separate sub-threads. The _reformat_string()
method is submitted within _create_table_row()
and uses the same executor. Before the row is built, all of the building threads must return the results.
""" Non-compliant Code Example """
from concurrent.futures import ThreadPoolExecutor
from typing import List
class ReportTableGenerator(object):
def __init__(self):
self.executor = ThreadPoolExecutor()
def generate_string_table(self, inputs: List[str]) -> str:
futures = []
aggregated = "|Data|Length|\n"
for i in inputs:
futures.append(self.executor.submit(self._create_table_row, i))
for future in futures:
aggregated += future.result()
return aggregated
def _create_table_row(self, row: str) -> str:
print(f"Creating a row out of: {row}")
future = self.executor.submit(self._reformat_string, row)
return f"|{future.result()}|{len(row)}|\n"
def _reformat_string(self, row: str) -> str:
print(f"Reformatting {row}")
row_reformated = row.capitalize()
return row_reformated
#####################
# exploiting above code example
#####################
report_table_generator = ReportTableGenerator()
attacker_messages = [str(msg) for msg in range(1000)]
print("ATTACKER: start sending messages")
result = report_table_generator.generate_string_table(attacker_messages)
print(
f"ATTACKER: done sending {len(attacker_messages)} messages, got {len(result)} messages "
f"back")
print(f"ATTACKER: result = {result}")
The problem arises when the number of concurrently built rows exceeds the number of _max_workers
. In this example, each worker thread could be occupied with the execution of the _create_table_row()
method, while the tasks of executing _reformat_string()
are waiting in the queue. Since the _create_table_row()
method spawns a _reformat_string()
thread and waits for its result, every worker will wait for another worker to finish their part of the row-building process. Because all workers are busy with the _create_table_row()
task, no worker will be able to take the string reformatting out of the queue, causing a deadlock.
The compliant01.py
code example illustrates how interdependent tasks could be removed. In order to avoid thread-starvation deadlocks, one task should not submit nested tasks to the same thread pool and then wait for their results.
""" Compliant Code Example """
from concurrent.futures import ThreadPoolExecutor
from typing import List
class ReportTableGenerator(object):
def __init__(self):
self.executor = ThreadPoolExecutor()
def generate_string_table(self, inputs: List[str]) -> str:
futures = []
aggregated = "|Data|Length|\n"
for i in inputs:
futures.append(self.executor.submit(self._create_table_row, i))
for future in futures:
aggregated += future.result()
return aggregated
def _create_table_row(self, row: str) -> str:
print(f"Creating a row out of: {row}")
return f"|{self._reformat_string(row)}|{len(row)}|\n"
def _reformat_string(self, row: str) -> str:
print(f"Reformatting {row}")
row_reformated = row.capitalize()
return row_reformated
#####################
# exploiting above code example
#####################
report_table_generator = ReportTableGenerator()
attacker_messages = [str(msg) for msg in range(1000)]
print("ATTACKER: start sending messages")
result = report_table_generator.generate_string_table(attacker_messages)
print(
f"ATTACKER: done sending {len(attacker_messages)} messages, got {len(result)} messages "
f"back")
print(f"ATTACKER: result = {result}")
Now, the _create_table_row()
method is executed in a separate thread but does not spawn any more threads, instead reformatting the string sequentially. Because each input argument can have its row built separately, no thread will need to wait for another to finish, which avoids thread-starvation deadlocks.
This code example introduces the idea of fork-join parallelism. Here, each task can spawn a number of parallel subtasks. The concept of fork-join parallelism has been described with code examples in Java [Gafter, 2006].
The BankingService class allows to concurrently validate the cards of all of the registered clients. Each level of granularity is represented by a separate method. First, the for_each_client()
method is called to fetch the data of all of the clients. Then, in order to check the clients’ accounts, each call of the for_each_client()
method spawns several threads that run the method. The for_each_account() method creates several threads for the for_each_card()
method and finally, the for_each_card()
method creates several threads for the check_card_validity()
method. Overall, we have 4 layers of tasks, among which the 3 highest ones (for_each_client(), for_each_account(), and for_each_card())
can “fork” a number of subtasks depending on the value of the number_of_times
variable. If each thread waits for its sub-threads to finish while sharing the same ThreadPoolExecutor
, they may cause a thread-starvation deadlock.
""" Non-compliant Code Example """
from concurrent.futures import ThreadPoolExecutor, wait
from threading import Lock
from typing import Callable
class BankingService(object):
def __init__(self, n: int):
self.executor = ThreadPoolExecutor()
self.number_of_times = n
self.count = 0
self.lock = Lock()
def for_each_client(self):
print("For each client")
self.invoke_method(self.for_each_account)
def for_each_account(self):
print("For each account")
self.invoke_method(self.for_each_card)
def for_each_card(self):
print("For each card")
self.invoke_method(self.check_card_validity)
def check_card_validity(self):
with self.lock:
self.count += 1
print(f"Number of checked cards: {self.count}")
def invoke_method(self, method: Callable):
futures = []
for _ in range(self.number_of_times):
futures.append(self.executor.submit(method))
wait(futures)
#####################
# exploiting above code example
#####################
browser_manager = BankingService(5)
browser_manager.for_each_client()
For the sake of the example, let’s assume we have 5 clients, with 5 accounts and 5 cards for each account number_of_times = 5
. In that case, we end up calling check_card_validity() 125
times, exhausting _max_workers
which is typically at around 32
. [Source: Python Docs] You can additionally call self.executor._work_queue.qsize()
to get the exact number of tasks that are waiting in the queue, unable to be executed by the already busy workers.
Python, as opposed to Java, does not provide CallerRunsPolicy
, which was suggested as a solution in [Gafter, 2006]. It is possible to recreate that functionality by manually keeping a count of currently executed tasks.
""" Compliant Code Example """
from concurrent.futures import ThreadPoolExecutor, wait
from threading import Lock
from typing import Callable
class BankingService(object):
def __init__(self, n: int):
self.executor = ThreadPoolExecutor()
self.number_of_times = n
self.count = 0
self.all_tasks = []
self.lock = Lock()
def for_each_client(self):
print("Per client")
self.invoke_method(self.for_each_account)
def for_each_account(self):
print("Per account")
self.invoke_method(self.for_each_card)
def for_each_card(self):
print("Per card")
self.invoke_method(self.check_card_validity)
def check_card_validity(self):
with self.lock:
self.count += 1
print(f"Number of checked cards: {self.count}")
def invoke_method(self, method: Callable):
futures = []
for _ in range(self.number_of_times):
if self.can_fit_in_executor():
self.lock.acquire()
future = self.executor.submit(method)
self.all_tasks.append(future)
self.lock.release()
futures.append(future)
else:
# Execute the method in the thread that invokes it
method()
wait(futures)
def can_fit_in_executor(self):
running = 0
for future in self.all_tasks:
if future.running():
running += 1
print(f"Max workers: {self.executor._max_workers}, Used workers: {running}")
# Depending on the order of submitted subtasks, the script can sometimes deadlock
# if we don't leave a spare worker.
return self.executor._max_workers > running + 1
#####################
# exploiting above code example
#####################
browser_manager = BankingService(5)
browser_manager.for_each_client()
This compliant code example adds a list of all submitted tasks to the BankingService
class. The lock, apart from controlling the counter, now ensures that only one thread can access the all_tasks
list at a time. Before a new task is submitted, can_fit_in_executor()
checks if submitting a new task will exhaust the thread pool. If so, the task is executed in the thread that tried to submit it.
unknown
[Brownlee, 2021] | Brownlee, J. (2021). How To Identify Deadlocks With The ThreadPoolExecutor in Python [online]. Available from: https://superfastpython.com/threadpoolexecutor-deadlock/ [accessed 12 April 2023] |
[Python docs] | Python Software Foundation. (2023). concurrent.futures - Launching parallel tasks [online]. Available from: https://docs.python.org/3/library/concurrent.futures.html?highlight=threadpoolexecutor#module-concurrent.futures [accessed 26 April 2023] |
[Gafter, 2006] | Gafter, N. (2006). A Thread Pool Puzzler [online]. Available from: http://gafter.blogspot.com/2006/11/thread-pool-puzzler.html [accessed 2 November 2023] |