Content-Length: 526673 | pFad | http://github.com/Azure/azure-sdk-for-net/pull/50426

1A Dequeue successfully processed queue messages before shutting down in drain mode by zahalzel · Pull Request #50426 · Azure/azure-sdk-for-net · GitHub
Skip to content

Dequeue successfully processed queue messages before shutting down in drain mode #50426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

zahalzel
Copy link
Member

@zahalzel zahalzel commented Jun 4, 2025

We have a bug in FlexConsumption scenarios where drain mode triggers a cancellation token in CompleteProcessingMessageAsync. As a result, messages that are successfully processed are not dequeued and therefore end up triggering further executions.

A previous attempt to resolve this bug was only partially successful, because the call to put the host in drain mode is still sending a cancellation token that gets passed into the DeleteMessage call. Any time that cancellation token is active, the queue processor will not delete the message, even if the message was successfully processed.
 
Related discussions / incidents:

https://learn.microsoft.com/en-us/answers/questions/2184443/problems-with-flex-consumption-function
https://portal.microsofticm.com/imp/v5/incidents/details/636224894/summary

This PR removes explicit CancellationToken parameters from several overloads of CompleteProcessingMessageAsync and related queue message operations, updating both production and test code accordingly.

The idea is that any time a message has been processed, we should always make a best-case attempt to dequeue it before shutting down the queue listener. Furthermore, there is not a compelling reason to send cancellation tokens to the calls to delete the message, so we remove the cancellation token parameter altogether.

Removed CancellationToken parameters from calls to CompleteProcessingMessageAsync and associated helper methods in production code.
Updated unit tests and mock setups in QueueProcessorTests.cs and QueueListenerTests.cs to align with the modified API signature.
Introduced new tests to verify cancellation token behavior during shutdown scenarios.

@Copilot Copilot AI review requested due to automatic review settings June 4, 2025 19:35
@github-actions github-actions bot added the Storage Storage Service (Queues, Blobs, Files) label Jun 4, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR removes explicit CancellationToken parameters from several overloads of CompleteProcessingMessageAsync and related queue message operations, updating both production and test code accordingly.

  • Removed CancellationToken parameters from calls to CompleteProcessingMessageAsync and associated helper methods in production code.
  • Updated unit tests and mock setups in QueueProcessorTests.cs and QueueListenerTests.cs to align with the modified API signature.
  • Introduced new tests to verify cancellation token behavior during shutdown scenarios.

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

File Description
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueProcessorTests.cs Removed cancellation token arguments from processor method calls in tests.
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueListenerTests.cs Adjusted mock setups and added tests for shutdown token behavior.
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueProcessor.cs Updated API calls by removing cancellation token parameters in message handling methods.
sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueListener.cs Refined cancellation token usage and improved log messages in StopAsync and ProcessMessageAsync.
Comments suppressed due to low confidence (1)

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueProcessor.cs:132

  • Using CancellationToken.None here instead of a propagating token changes the cancellation behavior during poison message handling. Verify that a best-effort message addition is acceptable in scenarios where cancellation might be desired.
await poisonQueue.AddMessageAndCreateIfNotExistsAsync(message.Body, CancellationToken.None).ConfigureAwait(false);

@@ -113,7 +126,7 @@ public QueueListener(QueueClient queue,

// if the function runs longer than this, the invisibility will be updated
// on a timer periodically for the duration of the function execution
_visibilityTimeout = TimeSpan.FromMinutes(10);
_visibilityTimeout = _queueOptions.VisibilityTimeout;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to make these changes in this PR. I agree this is somewhat confusing but is discussed more in detail in Azure/azure-webjobs-sdk#1040. Summary is, that the configured QueuesOptions.VisibilityTimeout IS used when messages fail processing (see code). This hardcoded value is just the initial timeout used when dequeuing messages. If we want to make any changes to this, I think we should discuss and do that separately, because it's not really related to the cancellation token changes right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair, it is indeed a separate scenario, I'll remove from this PR :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I do appreciate the attempt to fix a perceived issue :)

zahalzel and others added 2 commits June 5, 2025 13:12
…tests/QueueListenerTests.cs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@zahalzel zahalzel changed the title Updates Dequeue successfully processed queue messages before shutting down in drain mode Jun 5, 2025
@mathewc
Copy link
Member

mathewc commented Jun 5, 2025

Please update this PR with a more descriptive title than just "updates", and also provide some context on why the changes are being made. I know there's an internal teams thread with the details, but people on the Azure SDK team who are looking at PRs in this repo won't have that context.

Comment on lines 192 to 196
ThrowIfDisposed();
_timer.Cancel();
await Task.WhenAll(_processing).ConfigureAwait(false);
await _timer.StopAsync(cancellationToken).ConfigureAwait(false);
_logger.LogDebug($"Drain mode is not enabled, storage queue listener stopped ({_details})");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is introducing a lot of code duplication between these two branches. Instead, just isolate the lines that differ in your if/else block, and have the common code outside.

To simplify, probably you can just remove the "using (cancellationToken.Register" - I don't think the shutdown tasks need to be within a using scope

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mathewc How about this refactor? I can also remove the using scope entirely, but this is maybe a good compromise while preserving the existing behavior?

Copy link

github-actions bot commented Jun 5, 2025

API Change Check

APIView identified API level changes in this PR and created the following API reviews

Microsoft.Azure.WebJobs.Extensions.Storage.Common

{
// Cancel the execution token when drain mode is not enabled or drain mode manager is not set.
_logger.LogDebug($"Drain mode is not enabled, storage queue listener will stop immediately ({_details})");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "will stop immediately" log isn't really true - all you're doing differently here in this branch is cancelling the token passed into functions. That's just a signal to functions that shutdown is happening, it doesn't forcefully terminate any of the individual invocations. I.e. below the Task.WhenAll will still be gated on all the invocations completing, and that's up to the function implementation.

@@ -89,17 +89,17 @@ internal protected virtual async Task CompleteProcessingMessageAsync(QueueMessag
{
if (result.Succeeded)
{
await DeleteMessageAsync(message, cancellationToken).ConfigureAwait(false);
await DeleteMessageAsync(message, CancellationToken.None).ConfigureAwait(false);
Copy link
Member

@mathewc mathewc Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be ignoring the cancellationTokens passed into these methods - it should instead be up to the caller to decide to pass None in for operations that shouldn't be cancelled. I believe you're doing that in the QueueProcessor already, passing CancellationToken.None to CompleteProcessingMessageAsync.

// Only cancel completion or update of the message if a non-graceful shutdown is requested via _shutdownCancellationTokenSource.
await _queueProcessor.CompleteProcessingMessageAsync(message, result, _shutdownCancellationTokenSource.Token).ConfigureAwait(false);
// No cancellation token for message completion: always best-effort to complete
await _queueProcessor.CompleteProcessingMessageAsync(message, result, CancellationToken.None).ConfigureAwait(false);
Copy link
Member

@mathewc mathewc Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree with your general assertion that if a message has been processed by the user function, we don't want to cancel the completion actions. E.g. if the method succeeded, we need to delete the processed message from the queue.

However, it looks like _shutdownCancellationTokenSource is only canceled in 2 cases. 1) when the listener is disposed here, and 2) if the listener StopAsync call is cancelled here. Are you saying you think one of these is happening? I don't see how.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Storage Storage Service (Queues, Blobs, Files)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/Azure/azure-sdk-for-net/pull/50426

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy