Canceling the task in a thread pool only prevents it from being executed if it has not started yet. For the task to be interruptible, it must handle the threading.Event
flag.
Tasks can be submitted to the ThreadPoolExecutor by calling submit()
. Submitted tasks can be canceled by calling cancel()
on the Future object returned by submit()
. Calling this method will return True and stop the task from being executed if it has not started yet. However, if its execution has already started, calling cancel()
will instead return False and will not stop the task [Python 3.10.4 docs on threading.Event].
""" Non-compliant Code Example """
import time
from concurrent.futures import ThreadPoolExecutor
def take_time(x):
print(f"Started Task: {x}")
# Simulate work
for i in range(10):
time.sleep(1)
print(f"Completed Task: {x}")
def run_thread(_executor, var):
future = _executor.submit(take_time, var)
return future
def interrupt(future):
print(future.cancel())
print(f"Interrupted: {future}")
#####################
# Exploiting above code example
#####################
with ThreadPoolExecutor() as executor:
task = run_thread(executor, "A")
interrupt(task)
Tasks submitted to the ThreadPoolExecutor can be interrupted by setting a thread-safe flag, such as threading.Event
[Python 3.10.4 docs on threading.Event]. An Event object should be passed as an argument to the submitted task. From within the task function, we need to manually check the flag status by calling event.is_set()
and handling the interruption. In order to set the Event flag, we can call event.set()
on the event object.
""" Compliant Code Example """
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Event
def take_time(x, _event):
print(f"Started Task: {x}")
# Simulate work
for _ in range(10):
if _event.is_set():
print(f"Interrupted Task: {x}")
# Save partial results
return
time.sleep(1)
print(f"Completed Task: {x}")
def run_thread(_executor, var):
e = Event()
future = _executor.submit(take_time, var, e)
return future, e
def interrupt(future, e):
"""Cancel the task, just in case it is not yet running, and set the Event flag"""
future.cancel()
e.set()
#####################
# Exploiting above code example
#####################
with ThreadPoolExecutor() as executor:
task, event = run_thread(executor, "A")
interrupt(task, event)
[Python 3.10.4 docs Future.cancel] | concurrent.futures — Launching parallel tasks — Python 3.10.4 documentation. Available from: https://docs.python.org/3/library/concurrent.futures.html [Last Accessed May 2024] |
[Python 3.10.4 docs on threading.Event] | threading — Thread-based parallelism - Event Objects. Available from: https://docs.python.org/3/library/threading.html#event-objects [Last Accessed May 2024] |