Content-Length: 341388 | pFad | http://github.com/python/cpython/commit/ebb37fc3fdcb03db4e206db017eeef7aaffbae84

B7 gh-90622: Do not spawn ProcessPool workers on demand via fork method.… · python/cpython@ebb37fc · GitHub
Skip to content

Commit

Permalink
gh-90622: Do not spawn ProcessPool workers on demand via fork method. (
Browse files Browse the repository at this point in the history
…#91598)

Do not spawn ProcessPool workers on demand when they spawn via fork.

This avoids potential deadlocks in the child processes due to forking from
a multithreaded process.
  • Loading branch information
gpshead authored May 8, 2022
1 parent a84a56d commit ebb37fc
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 11 deletions.
44 changes: 34 additions & 10 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,10 @@ def __init__(self, max_workers=None, mp_context=None,
mp_context = mp.get_context()
self._mp_context = mp_context

# https://github.com/python/cpython/issues/90622
self._safe_to_dynamically_spawn_children = (
self._mp_context.get_start_method(allow_none=False) != "fork")

if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._initializer = initializer
Expand Down Expand Up @@ -714,6 +718,8 @@ def __init__(self, max_workers=None, mp_context=None,
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
if not self._safe_to_dynamically_spawn_children: # ie, using fork.
self._launch_processes()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
Expand All @@ -726,15 +732,32 @@ def _adjust_process_count(self):

process_count = len(self._processes)
if process_count < self._max_workers:
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._initializer,
self._initargs,
self._max_tasks_per_child))
p.start()
self._processes[p.pid] = p
# Assertion disabled as this codepath is also used to replace a
# worker that unexpectedly dies, even when using the 'fork' start
# method. That means there is still a potential deadlock bug. If a
# 'fork' mp_context worker dies, we'll be forking a new one when
# we know a thread is running (self._executor_manager_thread).
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
self._spawn_process()

def _launch_processes(self):
# https://github.com/python/cpython/issues/90622
assert not self._executor_manager_thread, (
'Processes cannot be fork()ed after the thread has started, '
'deadlock in the child processes could result.')
for _ in range(len(self._processes), self._max_workers):
self._spawn_process()

def _spawn_process(self):
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._initializer,
self._initargs,
self._max_tasks_per_child))
p.start()
self._processes[p.pid] = p

def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
Expand All @@ -755,7 +778,8 @@ def submit(self, fn, /, *args, **kwargs):
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()

self._adjust_process_count()
if self._safe_to_dynamically_spawn_children:
self._adjust_process_count()
self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
Expand Down
12 changes: 11 additions & 1 deletion Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,16 @@ def acquire_lock(lock):
lock.acquire()

mp_context = self.get_context()
if mp_context.get_start_method(allow_none=False) == "fork":
# fork pre-spawns, not on demand.
expected_num_processes = self.worker_count
else:
expected_num_processes = 3

sem = mp_context.Semaphore(0)
for _ in range(3):
self.executor.submit(acquire_lock, sem)
self.assertEqual(len(self.executor._processes), 3)
self.assertEqual(len(self.executor._processes), expected_num_processes)
for _ in range(3):
sem.release()
processes = self.executor._processes
Expand Down Expand Up @@ -1021,6 +1027,8 @@ def test_saturation(self):
def test_idle_process_reuse_one(self):
executor = self.executor
assert executor._max_workers >= 4
if self.get_context().get_start_method(allow_none=False) == "fork":
raise unittest.SkipTest("Incompatible with the fork start method.")
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
Expand All @@ -1029,6 +1037,8 @@ def test_idle_process_reuse_one(self):
def test_idle_process_reuse_multiple(self):
executor = self.executor
assert executor._max_workers <= 5
if self.get_context().get_start_method(allow_none=False) == "fork":
raise unittest.SkipTest("Incompatible with the fork start method.")
executor.submit(mul, 12, 7).result()
executor.submit(mul, 33, 25)
executor.submit(mul, 25, 26).result()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Worker processes for :class:`concurrent.futures.ProcessPoolExecutor` are no
longer spawned on demand (a feature added in 3.9) when the multiprocessing
context start method is ``"fork"`` as that can lead to deadlocks in the
child processes due to a fork happening while threads are running.

0 comments on commit ebb37fc

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/python/cpython/commit/ebb37fc3fdcb03db4e206db017eeef7aaffbae84

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy