-
Notifications
You must be signed in to change notification settings - Fork 5k
Dequeue successfully processed queue messages before shutting down in drain mode #50426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR removes explicit CancellationToken parameters from several overloads of CompleteProcessingMessageAsync and related queue message operations, updating both production and test code accordingly.
- Removed CancellationToken parameters from calls to CompleteProcessingMessageAsync and associated helper methods in production code.
- Updated unit tests and mock setups in QueueProcessorTests.cs and QueueListenerTests.cs to align with the modified API signature.
- Introduced new tests to verify cancellation token behavior during shutdown scenarios.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueProcessorTests.cs | Removed cancellation token arguments from processor method calls in tests. |
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueListenerTests.cs | Adjusted mock setups and added tests for shutdown token behavior. |
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueProcessor.cs | Updated API calls by removing cancellation token parameters in message handling methods. |
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueListener.cs | Refined cancellation token usage and improved log messages in StopAsync and ProcessMessageAsync. |
Comments suppressed due to low confidence (1)
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueProcessor.cs:132
- Using CancellationToken.None here instead of a propagating token changes the cancellation behavior during poison message handling. Verify that a best-effort message addition is acceptable in scenarios where cancellation might be desired.
await poisonQueue.AddMessageAndCreateIfNotExistsAsync(message.Body, CancellationToken.None).ConfigureAwait(false);
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueListenerTests.cs
Outdated
Show resolved
Hide resolved
@@ -113,7 +126,7 @@ public QueueListener(QueueClient queue, | |||
|
|||
// if the function runs longer than this, the invisibility will be updated | |||
// on a timer periodically for the duration of the function execution | |||
_visibilityTimeout = TimeSpan.FromMinutes(10); | |||
_visibilityTimeout = _queueOptions.VisibilityTimeout; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to make these changes in this PR. I agree this is somewhat confusing but is discussed more in detail in Azure/azure-webjobs-sdk#1040. Summary is, that the configured QueuesOptions.VisibilityTimeout IS used when messages fail processing (see code). This hardcoded value is just the initial timeout used when dequeuing messages. If we want to make any changes to this, I think we should discuss and do that separately, because it's not really related to the cancellation token changes right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair, it is indeed a separate scenario, I'll remove from this PR :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I do appreciate the attempt to fix a perceived issue :)
…tests/QueueListenerTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Please update this PR with a more descriptive title than just "updates", and also provide some context on why the changes are being made. I know there's an internal teams thread with the details, but people on the Azure SDK team who are looking at PRs in this repo won't have that context. |
ThrowIfDisposed(); | ||
_timer.Cancel(); | ||
await Task.WhenAll(_processing).ConfigureAwait(false); | ||
await _timer.StopAsync(cancellationToken).ConfigureAwait(false); | ||
_logger.LogDebug($"Drain mode is not enabled, storage queue listener stopped ({_details})"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is introducing a lot of code duplication between these two branches. Instead, just isolate the lines that differ in your if/else block, and have the common code outside.
To simplify, probably you can just remove the "using (cancellationToken.Register" - I don't think the shutdown tasks need to be within a using scope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mathewc How about this refactor? I can also remove the using scope entirely, but this is maybe a good compromise while preserving the existing behavior?
API Change CheckAPIView identified API level changes in this PR and created the following API reviews |
{ | ||
// Cancel the execution token when drain mode is not enabled or drain mode manager is not set. | ||
_logger.LogDebug($"Drain mode is not enabled, storage queue listener will stop immediately ({_details})"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This "will stop immediately" log isn't really true - all you're doing differently here in this branch is cancelling the token passed into functions. That's just a signal to functions that shutdown is happening, it doesn't forcefully terminate any of the individual invocations. I.e. below the Task.WhenAll will still be gated on all the invocations completing, and that's up to the function implementation.
@@ -89,17 +89,17 @@ internal protected virtual async Task CompleteProcessingMessageAsync(QueueMessag | |||
{ | |||
if (result.Succeeded) | |||
{ | |||
await DeleteMessageAsync(message, cancellationToken).ConfigureAwait(false); | |||
await DeleteMessageAsync(message, CancellationToken.None).ConfigureAwait(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should be ignoring the cancellationTokens passed into these methods - it should instead be up to the caller to decide to pass None in for operations that shouldn't be cancelled. I believe you're doing that in the QueueProcessor already, passing CancellationToken.None to CompleteProcessingMessageAsync.
// Only cancel completion or update of the message if a non-graceful shutdown is requested via _shutdownCancellationTokenSource. | ||
await _queueProcessor.CompleteProcessingMessageAsync(message, result, _shutdownCancellationTokenSource.Token).ConfigureAwait(false); | ||
// No cancellation token for message completion: always best-effort to complete | ||
await _queueProcessor.CompleteProcessingMessageAsync(message, result, CancellationToken.None).ConfigureAwait(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree with your general assertion that if a message has been processed by the user function, we don't want to cancel the completion actions. E.g. if the method succeeded, we need to delete the processed message from the queue.
However, it looks like _shutdownCancellationTokenSource is only canceled in 2 cases. 1) when the listener is disposed here, and 2) if the listener StopAsync call is cancelled here. Are you saying you think one of these is happening? I don't see how.
We have a bug in FlexConsumption scenarios where drain mode triggers a cancellation token in CompleteProcessingMessageAsync. As a result, messages that are successfully processed are not dequeued and therefore end up triggering further executions.
A previous attempt to resolve this bug was only partially successful, because the call to put the host in drain mode is still sending a cancellation token that gets passed into the DeleteMessage call. Any time that cancellation token is active, the queue processor will not delete the message, even if the message was successfully processed.
Related discussions / incidents:
https://learn.microsoft.com/en-us/answers/questions/2184443/problems-with-flex-consumption-function
https://portal.microsofticm.com/imp/v5/incidents/details/636224894/summary
This PR removes explicit CancellationToken parameters from several overloads of CompleteProcessingMessageAsync and related queue message operations, updating both production and test code accordingly.
The idea is that any time a message has been processed, we should always make a best-case attempt to dequeue it before shutting down the queue listener. Furthermore, there is not a compelling reason to send cancellation tokens to the calls to delete the message, so we remove the cancellation token parameter altogether.
Removed CancellationToken parameters from calls to CompleteProcessingMessageAsync and associated helper methods in production code.
Updated unit tests and mock setups in QueueProcessorTests.cs and QueueListenerTests.cs to align with the modified API signature.
Introduced new tests to verify cancellation token behavior during shutdown scenarios.