-
-
Notifications
You must be signed in to change notification settings - Fork 30.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
asyncio.Semaphore waiters deque doesn't work #90155
Comments
Class import asyncio
async def process(idx: int, semaphore: asyncio.Semaphore) -> None:
while True:
async with semaphore:
print(f'ACQUIRE {idx}')
await asyncio.sleep(1)
async def main() -> None:
semaphore = asyncio.Semaphore(5)
await asyncio.gather(*[process(idx, semaphore) for idx in range(20)])
asyncio.run(main()) In console:
Ugly fix, is to add asyncio.sleep right before semaphore. while True:
await asyncio.sleep(0)
async with semaphore:
... Also, I found a comment on Stack Overflow about race condition in Semaphore implementation, but I don't know about the quality of that comment: |
Good point. In a tight loop, a task can re-acquire the lock just after releasing even if there are pending waiters that were scheduled earlier. It's true also for Lock, Conditional, Event, etc. The solution requires async release method. Since the change is not backward compatible, a new method should be added, e.g. async context manager can be modified for using the new method without backward compatibility problems easily. A hero who can help is welcome! |
Or, maybe, there is a way to do everything without changing public API. The idea is: _wake_up_next can create a future which is set by *waked up task* on its acquiring. acquire method should wait for this future first before entering in `while self._value < 0:` loop. If the future is cancelled, If there is no *acquire waiting* future exists -- do everything as usual. All other lock objects should be modified also. |
Thanks for response!
I will try to make PR :raising_hand |
When people state "X is broken" it gets my hackles up a bit. Can you explain in words what's broken? How badly is it broken? When does user code experience it? I see the code you showed but I need an explanation. |
The semaphore is broken. It gets corrupted and no longer usable.
The broken semaphore blocks a task when the task should be allowed to proceed. In this sense it is completely unusable. How bad is the result depends on what the task is doing. The symptom could be no terminal output, network connection timeout, or other bad things you can expect when a task cannot proceed.
What I found was when a task cancels another task at certain time. This is a race condition so I can't give exact description of the condition that triggers the problem. Someone has documented his own experience here. And to add a bit explanation about the code itself, this is the story in plain words: Player Zero and One share a hotel room whose door is locked and there is only one key. They arrive at the door at about the same time. Zero sleeps 0 seconds then grabs the key (at time 0). He goes into the room and sleeps for another 2 seconds. During his sleep One wakes up from his first sleep (at time 1). One tries to grab the key but he can't, because there is only one key currently held by Zero. So One waits outside on the lock. Zero wakes up (at time 2). He gets out of the room, returns the key, then murders One at the door. Now One has died and he has never got the key. A moment later Police Main comes to the door (at time 3) to investigate. He is the only one here, and there is a key, but he just cannot open the door with the key. Why? Because the lock is damaged. Now Main waits forever on the damaged lock. Cast:
|
Presumably the problem is that some code gets interrupted by the cancellation and exits without restoring an invariant. Not surprising given the complexity of the code. But cancellation only happens in await (there is no pre-emption in asyncio) so it should not be hard to find, right? |
9d59381#diff-0fee1befb15023abc0dad2623effa93a304946796929f6cb445d11a57821e737R368 Roughly speaking it's |
I'm beginning to wonder of the root of the problem isn't that the Semaphore implementation decided to remove the completed Future in release() instead of in acquire(). If we compare to Lock, it removes the future in acquire(): try:
try:
await fut
finally:
self._waiters.remove(fut)
except exceptions.CancelledError:
if not self._locked:
self._wake_up_first()
raise In its if (not self._locked and (self._waiters is None or
all(w.cancelled() for w in self._waiters))):
self._locked = True
return True IOW it only grabs it when nobody else is waiting. Adapting this approach for Semaphore would probably improve the code -- no more @kumaraditya303 What do you think? And @cykerway? Looking at things from this perspective, I think the Lock class is fair and does not break the lock when acquire() is cancelled. I'm less sure about Queue -- like Semaphore, it removes the waiter from the queue of waiters when waking up, but it has much more thorough handling of cancellation (using in |
That's a great observation. I didn't really pay attention to The result has been pushed to #93222. It can almost pass all the tests, except that I have to change some |
I hadn't seen this before I reviewed your new version; I'll look at those tests and see if I can understand why sleep(0.01) is now needed. It definitely irks me too. I think the idea is that |
Hope you can post something here once you completely figure out sleep(0) vs sleep(0.01). The coroutine model is a categorization of coroutines across two dimensions: asymmetric/symmetric and stackful/stackless. Asymmetric means the callee coroutine always passes control to its caller on yield. Symmetric means the callee coroutine passes control to anyone it likes. Stackful means coroutine preserves its entire stack on yield (like fiber). Stackless means coroutine is implemented as a state machine. It seems Python generators, async functions, etc. fall into the asymmetric+stackless category. I believe knowing this helps understand how coroutines are scheduled in Python, which in turn helps answer the sleep(0) vs sleep(0.01) question. |
I think I've figured out why some tests appeared to need Thanks for the clarification of your coroutine classification scheme -- I'm so used to Python's particular brand of coroutines that I forget the other quadrants of the design space. :-) And yes, once the event loop passes control to a particular coroutine, that coroutine runs without intervention until it next yields -- |
Okay so I finally got time to look into this: Currently, the semaphore does not provide FIFO ordering. The was supposed to be fixed by #31910 but it didn't. The correct fix for semaphore should have the following semantics:
Here's my simplified implementation which maintains the FIFO ordering: import asyncio
import collections
class Semaphore:
def __init__(self, value: int) -> None:
self.value = value
self.waiters = collections.deque()
async def __aenter__(self):
await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
self.release()
def locked(self) -> bool:
return self.value == 0
async def acquire(self) -> None:
if not self.locked() and not self.waiters:
# No need to wait as the semaphore is not locked
# and no one is waiting
self.value -= 1
return
# if there are waiters or the semaphore is locked
fut = asyncio.Future()
self.waiters.append(fut)
try:
await fut
finally:
self.waiters.remove(fut)
self.value -= 1
if not self.locked():
# This is required for strict FIFO ordering
# otherwise it can cause starvation on the waiting tasks
# The next loop iteration will wake up the task and switch
self._wakeup_next()
return
def _wakeup_next(self) -> None:
if self.waiters:
# Wake up the first waiter, it is removed by the waiting task
waiter = self.waiters[0]
if not waiter.done():
# This schedules the task to be woken up on next loop iteration
# It requires exactly one iteration of loop, See Task.__wakeup in pure python
waiter.set_result(None)
def release(self) -> None:
self.value += 1
self._wakeup_next()
async def process(idx: int, semaphore: Semaphore) -> None:
while True:
async with semaphore:
print(f'ACQUIRE {idx}')
await asyncio.sleep(1)
async def main() -> None:
semaphore = Semaphore(5)
await asyncio.gather(*[process(idx, semaphore) for idx in range(20)])
asyncio.run(main()) |
You didn't handle cancelled tasks. This program would hang. sem = Semaphore(1)
async def c1(): await sem.acquire()
async def c2(): await sem.acquire()
async def c3(): await sem.acquire()
t1 = create_task(c1())
t2 = create_task(c2())
t3 = create_task(c3())
await sleep(0)
sem.release()
sem.release()
t2.cancel()
await gather(t1, t2, t3, return_exceptions=True) |
Right, but I did say "simplified implementation" and I avoided cancellation handling to make understanding of waking of task in FIFO order easier which is the main point of confusion. |
But the cancellation is what makes it so challenging. :-( For example, the acquiring task can go through the following states (not counting the "trivial success" at the top):
That last state is surprising -- after the future's result is set the future cannot be cancelled, but the task can -- and CancelledError will be thrown into the task in that case. (You can only get there by calling sem.release() and task.cancel() without sleeping in between.) I have been trying to come up with a different version that passes all tests, but so far without success. If nothing else, this would give me more confidence that the tests are correct. |
…antee (pythonGH-93222) The main problem was that an unluckily timed task cancellation could cause the semaphore to be stuck. There were also doubts about strict FIFO ordering of tasks allowed to pass. The Semaphore implementation was rewritten to be more similar to Lock. Many tests for edge cases (including cancellation) were added. (cherry picked from commit 24e0379) Co-authored-by: Cyker Way <cykerway@gmail.com>
…antee (pythonGH-93222) The main problem was that an unluckily timed task cancellation could cause the semaphore to be stuck. There were also doubts about strict FIFO ordering of tasks allowed to pass. The Semaphore implementation was rewritten to be more similar to Lock. Many tests for edge cases (including cancellation) were added. (cherry picked from commit 24e0379) Co-authored-by: Cyker Way <cykerway@gmail.com>
…93222) The main problem was that an unluckily timed task cancellation could cause the semaphore to be stuck. There were also doubts about strict FIFO ordering of tasks allowed to pass. The Semaphore implementation was rewritten to be more similar to Lock. Many tests for edge cases (including cancellation) were added.
…H-93222) The main problem was that an unluckily timed task cancellation could cause the semaphore to be stuck. There were also doubts about strict FIFO ordering of tasks allowed to pass. The Semaphore implementation was rewritten to be more similar to Lock. Many tests for edge cases (including cancellation) were added. (cherry picked from commit 24e0379) Co-authored-by: Cyker Way <cykerway@gmail.com>
…H-93222) The main problem was that an unluckily timed task cancellation could cause the semaphore to be stuck. There were also doubts about strict FIFO ordering of tasks allowed to pass. The Semaphore implementation was rewritten to be more similar to Lock. Many tests for edge cases (including cancellation) were added. (cherry picked from commit 24e0379) Co-authored-by: Cyker Way <cykerway@gmail.com>
I blogged a bit about this: https://neopythonic.blogspot.com/2022/10/reasoning-about-asynciosemaphore.html |
…H-93222) The main problem was that an unluckily timed task cancellation could cause the semaphore to be stuck. There were also doubts about strict FIFO ordering of tasks allowed to pass. The Semaphore implementation was rewritten to be more similar to Lock. Many tests for edge cases (including cancellation) were added. (cherry picked from commit 24e0379) Co-authored-by: Cyker Way <cykerway@gmail.com>
…H-93222) The main problem was that an unluckily timed task cancellation could cause the semaphore to be stuck. There were also doubts about strict FIFO ordering of tasks allowed to pass. The Semaphore implementation was rewritten to be more similar to Lock. Many tests for edge cases (including cancellation) were added. (cherry picked from commit 24e0379) Co-authored-by: Cyker Way <cykerway@gmail.com>
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: