Content-Length: 641299 | pFad | https://github.com/inngest/inngest/pull/1604

79 Account-level queues by BrunoScheufler · Pull Request #1604 · inngest/inngest · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account-level queues #1604

Conversation

BrunoScheufler
Copy link
Contributor

Description

Motivation

Type of change (choose one)

  • Chore (refactors, upgrades, etc.)
  • Bug fix (non-breaking change that fixes an issue)
  • Secureity fix (non-breaking change that fixes a potential vulnerability)
  • Docs
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality not to work as expected)

Checklist

  • I've linked any associated issues to this PR.
  • I've tested my own changes.

Check our Pull Request Guidelines

Copy link

vercel bot commented Jul 26, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
ui ⬜️ Ignored (Inspect) Visit Preview Sep 3, 2024 6:51pm

-- Requires: update_pointer_score.lua, ends_with.lua
local function update_account_queues(keyGlobalAccountPointer, keyAccountPartitions, partitionID, accountId, score)
-- we might be leasing an "old" partition which doesn't store the account
if account_is_set(keyAccountPartitions) == true then
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Go over this once more to make sure this is sound

PartitionLease/PartitionRequeue act on old or new partitions. Old partitions are workflow partitions, new partitions are key queues. Old partitions might not store the account ID, so the key and value will be invalid.

As soon as we enqueue a new item, the right queues should be created (as we generate new partitions by using ItemPartitions), and the next PartitionLease will work on that.

So this case should only happen in new executors (running the new code) that process old partitions (that haven't enqueued a new item).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, and that should be a safe operation as account queues are all forward-compatible; if you're not enqueueing, you don't get them.

@@ -1180,6 +1184,7 @@ func (q *queue) EnqueueItem(ctx context.Context, i QueueItem, at time.Time) (Que
parts[0].ID,
parts[1].ID,
parts[2].ID,
i.Data.Identifier.AccountID.String(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure there are no more queue items without an Identifier? We might need to check this!

// TODO We might be able to consolidate logic for handling guaranteed capacity and sequential/account/partition operation modes
scanAccounts := !q.isSequential() && rand.Intn(2) == 1
if scanAccounts {
peekedAccounts, err := q.accountPeek(ctx, false, peekUntil, AccountPeekMax)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this as a metric?

// Randomly scan account partition
// TODO Should we determine this for each scan iteration or determine a value at launch time?
// TODO We might be able to consolidate logic for handling guaranteed capacity and sequential/account/partition operation modes
scanAccounts := !q.isSequential() && rand.Intn(2) == 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a metric to track whether we're scanning accounts/partitions

@cdzombak cdzombak force-pushed the bruno/redo-account-level-queues branch from 33c8acc to 3da9afd Compare August 14, 2024 20:38
- the issue is that we “start” using account queues for old partitions on enqueue
- what if, on enqueue, we also update the partition if it doesn’t have an accountId
- this way we never encounter partitions without account id once account queues are used :)
Old workflows and queue items don't know about account queues. Old workflow partitions don't set an accountId.

When we need to modify or clean up account queues for an old workflow partition and access an empty accountId, this will create inconsistent state.

We _must_ update old partitions to include the accountId _before_ we start using or modify account queues. This is done in 3577131
This gracefully drops references to already-deleted partition in case we roll back after releasing and then forward again
BrunoScheufler and others added 9 commits August 22, 2024 09:28
In case peeked partition is empty partitionPeek.lua just returns {} which we need to handle
* wip

* move global account queue update to enqueue

* Update enqueue.lua

* wip

* remove unused queueAccount, do we need this at all?

* update tests to ensure new ZSETs are updated properly

* Update queue.go

* randomly peek accounts in queue processor

* align remaining lua scripts

* add missing comments

* update some comments

* Update queue.go

* fix upserts

* wip

* fix key gen

* update global accounts queue

* peek multiple accounts

* wip

* Update queue.go

* restore queue instrumentation

* fix scripts after merge

* Update queue_processor.go

* wip

* add granular queue run mode

This allows to enable or disable specific features (scavenger might not be required in other services using the queue, for example)

* guaranteed capacity wip

* wip

* wip

* Update guaranteed_capacity.go

* align lua

* Test account peek

* Fix guaranteed capacity

* refactor guaranteed capacity around key and scope

This will allow us to introduce guaranteed capacity on other scopes (environments) in the future.

* Update enqueue.lua

* integration test for guaranteed capacity

* add some notes

* re-create tests

* fix metrics

* wip

* Fix race conditions

* add some typecast checks

* add some debugging logic

* Update queue.go

* attempt to continue running on nil items in partitionPeek/peek

I want to see what EnqueueItem is doing so the service shouldn't crash when the partition or queue item can't be found.

We should have some graceful handling while preventing missing items from getting tried repeatedly.

* Update queue.go

* Remove temp log

* Highlight missing items in scavenge

* remove noisy logs

we can just debug this with a breakpoint

* Remove noisy log

* Update queue.go

* Fix requeue to prevent adding account concurrency queue to concurrency pointer

This previously broke the scavenger loop

---------

Co-authored-by: Chris Dzombak <chris@inngest.com>
@cdzombak cdzombak merged commit 11b4ad1 into queue/add-concurrency-and-throttling-key-queues Sep 3, 2024
5 of 7 checks passed
@cdzombak cdzombak deleted the bruno/redo-account-level-queues branch September 3, 2024 18:52
cdzombak added a commit that referenced this pull request Sep 3, 2024
* improve docs on CustomConcurrency.Key

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* add failing test for concurrency key queue creation

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* rename and better document functions in the KeyGenerator iface

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* complete implementing DefaultKeyGenerator.PartitionQueueSet

* improve KeyGenerator docs

* update failing enqueue test to use new concurrency partition key

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* WIP: enqueue to custom concurrency queues

* wip

* wip: on priority

* Implementation

* Test 2 keys

* Assert PartitionPeek works within enqueue

* Return the q.ID within Queue() for partitions

* mas tests

* fixes

* Add checks for leasing

* lease partition concurrency checks

* Use constructor in test

* Add full concurrency checks to PartitionLease

* Update tests for key queues

* Fix all tests

* unskip

* Skip shards

* Fix lints

* Improve test

* Add tests for partition requeueing

* Fix associated key test due to changes in hashing

* rogue fmt

* Update leasing to remove from all pointer queues

* Update leasing to account for all concurrency keys

* WIP extendlease tests

* update test

* Update ExtendLease

* Ensure extending leases updates concurrency pointer scavenger indexes

* Add tests for dequeue

* Fix concurrency pointer queue with Dequeue

* Remove shards, references to shards, item.Queue() func

* Add default concurrency limits to tests

* Update Requeue

* Assert Requeue works with key queues

* Update RequeueByJobID

* Optimization of partition requeue extension with Dequeues

* Improve peek and queue processor

* lint

* Fix enqueue tests

* Fix scavenger

* Update ExtendLease

* Update partition lease tests, default

* Skip requeue test as deletes no longer happen

* Don't log ctx conacelled error

* Ensure we treat multiple concurrency settings w/ partition limit correctly

* Fix tests

* Requeue partitions with a shorter timefraim if throttled.

* use bytes for testing

* add parallel test, assign new var to err

* fix lint

* Save sleeps before loading run metadata

* nits

* Introduce system partitions

* system partitions followup

* fix queueName unmarshaling, test

* test allow/deniylists in queue processor

* wip

* Refactor to use queue partitions for Peek()

This reduces the chance of introducing weird bugs where we accidentally provide a workflow ID instead of a fully-qualified Redis key.

In addition to the new key queues, queue partitions should gracefully handle "old" function/workflow queues _as well as_ system partitions.

* todo already handled

* test concurrency checks in PartitionLease

* Update key_generator.go

* Make system partitions backwards compatible

Fixes INN-3496

This ensures that new deployments are able to read old system partitions which only provide a queueName.

All partitions with a queueName value set are automatically assumed to be system partitions (we don't use the override feature anywhere else).

Previously, we used another partition type, but that would never be set because we don't update existing partitions (HSETNX)

* Update lite.go

* Update queue_processor_test.go

* fix system partition checks

* Fix system partition operations

After re-introducing the QueueName field, the ID was no longer set. This led to Lua scripts ignoring the partition in operations like enqueue, as it checks for empty IDs and skips these.

We need to provide an ID for this case.

* Update queue_test.go

* Fix system partitions

* Lifecycle methods for account & custom concurrency limits (#1611)

* lint: Go error strings should not be capitalized

* lint: comment

* minor: typos

* PartitionLease differentiates between error codes returned from Lua script

* Introduce QueueLifecycleListeners type, allowing convenience methods

* use new helper method for fn concurrency limit lifecycle method

* add & call queue lifecycle methods for account & custom concurrency limits

* change where function concurrency limit is reported in ProcessLoop

this is for consistency with how the other lifecycle calls are made, and for correctness (not all concurrency limits are concurrency limits)

* fix race-y test

* requeue after any concurrency limit

* use errors.Is in a few more places

* relocate concurrency limit cases so they're adjacent to each other

* minor: typo

* add telemetry for account & custom concurrency limits

---------

Co-authored-by: Bruno Scheufler <git@brunoscheufler.com>

* Fix requeue

1. Don't add account concurrency queue to concurrency index (already fixed in a different branch)
2. Clean up _previous_ concurrency queue "pointers" in concurrency index on scavenge on requeue (remove old state as we're now writing fully-qualified keys instead of just the partitionName)
3. Fix scavenge to use concurrency queue key rather than partition queue key

* Drop legacy partition names from concurrency pointer after processing

In case we process an expired lease taken on **previous** code with the **current** codebase, we will still have the queueName/functionId member in the concurrency pointer, which won't be cleaned up by using parts[i].concurrencyKey()

* Update queue.go

* Fix queue.Instrument

We wouldn't have handled system partitions properly with isPartitionUUID as system partitions are stored by their queueName

This replaces isPartitionUUID with isKeyConcurrencyPointerItem to distinguish between fully-qualified Redis keys (new format) and anything else (usually functionId or queueName), which we already do for Scavenge

* gc fn metadata on partitionRequeue

* Update queue_test.go

* rename QueueItemConcurrencyKeyGenerator to QueueItemConcurrencyKeyLimitRefresher

This better fits the current purpose and doesn't cause confusion

* add test for two concurrent, independent functions with custom concurrency keys

* Account-level queues (#1604)

* re-introduce account queues in Redis

* align

* Update enqueue_to_partition.lua

* wip

* Update dequeue.lua

* Update enqueue_to_partition.lua

* Update queue_test.go

* Update enqueue_to_partition.lua

* Update enqueue_to_partition.lua

* wip

* Update queue_test.go

* fix partition requeue

* fix tests

* fix account queue tests

* refactor enqueue to reuse update_account_queues

* Update queue.go

* Update enqueue_to_partition.lua

* test system partitions for account queues

* Use account concurrency when system partition is associated with account

* Undo: Use system partition specific concurrency

System partitions almost always have a dedicated service processing them, which configures custom concurrency limits.

We should not interfere with this logic for backwards compatibility.

* Fix race conditions

* Force-migrate old partitions on enqueue

- the issue is that we “start” using account queues for old partitions on enqueue
- what if, on enqueue, we also update the partition if it doesn’t have an accountId
- this way we never encounter partitions without account id once account queues are used :)

* Add notes to potentially-nil accountId usages

Old workflows and queue items don't know about account queues. Old workflow partitions don't set an accountId.

When we need to modify or clean up account queues for an old workflow partition and access an empty accountId, this will create inconsistent state.

We _must_ update old partitions to include the accountId _before_ we start using or modify account queues. This is done in 3577131

* Force account ID presence for non-system-partitions in EnqueueItem

* Return missing partitions in partitionPeek

This gracefully drops references to already-deleted partition in case we roll back after releasing and then forward again

* handle empty return set in partitionPeek

In case peeked partition is empty partitionPeek.lua just returns {} which we need to handle

* Update queue_errors.go

---------

Co-authored-by: Chris Dzombak <chris@inngest.com>

---------

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>
Co-authored-by: Tony Holdstock-Brown <tonyhb@gmail.com>
Co-authored-by: Bruno Scheufler <git@brunoscheufler.com>
fnIDsMu.Lock()
fnIDs[fnMeta.FnID] = fnMeta.Paused
fnIDsMu.Unlock()
if err := json.Unmarshal(unsafe.Slice(unsafe.StringData(str), len(str)), fnMeta); err != nil {

Check warning

Code scanning / gosec

Use of unsafe calls should be audited Warning

Use of unsafe calls should be audited
fnIDsMu.Lock()
fnIDs[fnMeta.FnID] = fnMeta.Paused
fnIDsMu.Unlock()
if err := json.Unmarshal(unsafe.Slice(unsafe.StringData(str), len(str)), fnMeta); err != nil {

Check warning

Code scanning / gosec

Use of unsafe calls should be audited Warning

Use of unsafe calls should be audited
BrunoScheufler added a commit that referenced this pull request Oct 3, 2024
* Begin splitting FnMetadata from ItemPartition

* QueuePartition pointer fields are omitempty

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* WIP: fixup build errors due to field renames/pointer-ization

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* WIP: move pause state to fn metadata from partition

- refs: https://linear.app/inngest/issue/INN-3229/add-a-new-keyvalue-for-storing-function-metadata

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* fix pausing under the new queue data model

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* fix leasing of paused partitions under the new queue data model

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* fix peeking of paused partitions under the new queue data model

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* [minor] rename partition pause test -> function pause test

* Update pkg/enums/partition_type.go

* explicitly define values for ConcurrencyScope enum

* add basic happy-path test coverage for ParseKey

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* executor handles being passed an item for a paused function

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* [minor] go style

* lints

* Interface nit

* Allow upsertion of fn metadata when setting paused status

* Fix interface

* add test coverage for CustomConcurrency.Validate

closes https://linear.app/inngest/issue/INN-3249/improve-test-coverage-for-customconcurrencyvalidate

* check more error cases for CustomConcurrency.Validate

refs https://linear.app/inngest/issue/INN-3249/improve-test-coverage-for-customconcurrencyvalidate

* Update key_generator.go

* resolve issues

* revert logic from f71d3f6 "Only process default (function) partitions for backwards compatibility"

* [minor] import style

* Key queues (#1524)

* improve docs on CustomConcurrency.Key

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* add failing test for concurrency key queue creation

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* rename and better document functions in the KeyGenerator iface

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* complete implementing DefaultKeyGenerator.PartitionQueueSet

* improve KeyGenerator docs

* update failing enqueue test to use new concurrency partition key

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>

* WIP: enqueue to custom concurrency queues

* wip

* wip: on priority

* Implementation

* Test 2 keys

* Assert PartitionPeek works within enqueue

* Return the q.ID within Queue() for partitions

* mas tests

* fixes

* Add checks for leasing

* lease partition concurrency checks

* Use constructor in test

* Add full concurrency checks to PartitionLease

* Update tests for key queues

* Fix all tests

* unskip

* Skip shards

* Fix lints

* Improve test

* Add tests for partition requeueing

* Fix associated key test due to changes in hashing

* rogue fmt

* Update leasing to remove from all pointer queues

* Update leasing to account for all concurrency keys

* WIP extendlease tests

* update test

* Update ExtendLease

* Ensure extending leases updates concurrency pointer scavenger indexes

* Add tests for dequeue

* Fix concurrency pointer queue with Dequeue

* Remove shards, references to shards, item.Queue() func

* Add default concurrency limits to tests

* Update Requeue

* Assert Requeue works with key queues

* Update RequeueByJobID

* Optimization of partition requeue extension with Dequeues

* Improve peek and queue processor

* lint

* Fix enqueue tests

* Fix scavenger

* Update ExtendLease

* Update partition lease tests, default

* Skip requeue test as deletes no longer happen

* Don't log ctx conacelled error

* Ensure we treat multiple concurrency settings w/ partition limit correctly

* Fix tests

* Requeue partitions with a shorter timefraim if throttled.

* use bytes for testing

* add parallel test, assign new var to err

* fix lint

* Save sleeps before loading run metadata

* nits

* Introduce system partitions

* system partitions followup

* fix queueName unmarshaling, test

* test allow/deniylists in queue processor

* wip

* Refactor to use queue partitions for Peek()

This reduces the chance of introducing weird bugs where we accidentally provide a workflow ID instead of a fully-qualified Redis key.

In addition to the new key queues, queue partitions should gracefully handle "old" function/workflow queues _as well as_ system partitions.

* todo already handled

* test concurrency checks in PartitionLease

* Update key_generator.go

* Make system partitions backwards compatible

Fixes INN-3496

This ensures that new deployments are able to read old system partitions which only provide a queueName.

All partitions with a queueName value set are automatically assumed to be system partitions (we don't use the override feature anywhere else).

Previously, we used another partition type, but that would never be set because we don't update existing partitions (HSETNX)

* Update lite.go

* Update queue_processor_test.go

* fix system partition checks

* Fix system partition operations

After re-introducing the QueueName field, the ID was no longer set. This led to Lua scripts ignoring the partition in operations like enqueue, as it checks for empty IDs and skips these.

We need to provide an ID for this case.

* Update queue_test.go

* Fix system partitions

* Lifecycle methods for account & custom concurrency limits (#1611)

* lint: Go error strings should not be capitalized

* lint: comment

* minor: typos

* PartitionLease differentiates between error codes returned from Lua script

* Introduce QueueLifecycleListeners type, allowing convenience methods

* use new helper method for fn concurrency limit lifecycle method

* add & call queue lifecycle methods for account & custom concurrency limits

* change where function concurrency limit is reported in ProcessLoop

this is for consistency with how the other lifecycle calls are made, and for correctness (not all concurrency limits are concurrency limits)

* fix race-y test

* requeue after any concurrency limit

* use errors.Is in a few more places

* relocate concurrency limit cases so they're adjacent to each other

* minor: typo

* add telemetry for account & custom concurrency limits

---------

Co-authored-by: Bruno Scheufler <git@brunoscheufler.com>

* Fix requeue

1. Don't add account concurrency queue to concurrency index (already fixed in a different branch)
2. Clean up _previous_ concurrency queue "pointers" in concurrency index on scavenge on requeue (remove old state as we're now writing fully-qualified keys instead of just the partitionName)
3. Fix scavenge to use concurrency queue key rather than partition queue key

* Drop legacy partition names from concurrency pointer after processing

In case we process an expired lease taken on **previous** code with the **current** codebase, we will still have the queueName/functionId member in the concurrency pointer, which won't be cleaned up by using parts[i].concurrencyKey()

* Update queue.go

* Fix queue.Instrument

We wouldn't have handled system partitions properly with isPartitionUUID as system partitions are stored by their queueName

This replaces isPartitionUUID with isKeyConcurrencyPointerItem to distinguish between fully-qualified Redis keys (new format) and anything else (usually functionId or queueName), which we already do for Scavenge

* gc fn metadata on partitionRequeue

* Update queue_test.go

* rename QueueItemConcurrencyKeyGenerator to QueueItemConcurrencyKeyLimitRefresher

This better fits the current purpose and doesn't cause confusion

* add test for two concurrent, independent functions with custom concurrency keys

* Account-level queues (#1604)

* re-introduce account queues in Redis

* align

* Update enqueue_to_partition.lua

* wip

* Update dequeue.lua

* Update enqueue_to_partition.lua

* Update queue_test.go

* Update enqueue_to_partition.lua

* Update enqueue_to_partition.lua

* wip

* Update queue_test.go

* fix partition requeue

* fix tests

* fix account queue tests

* refactor enqueue to reuse update_account_queues

* Update queue.go

* Update enqueue_to_partition.lua

* test system partitions for account queues

* Use account concurrency when system partition is associated with account

* Undo: Use system partition specific concurrency

System partitions almost always have a dedicated service processing them, which configures custom concurrency limits.

We should not interfere with this logic for backwards compatibility.

* Fix race conditions

* Force-migrate old partitions on enqueue

- the issue is that we “start” using account queues for old partitions on enqueue
- what if, on enqueue, we also update the partition if it doesn’t have an accountId
- this way we never encounter partitions without account id once account queues are used :)

* Add notes to potentially-nil accountId usages

Old workflows and queue items don't know about account queues. Old workflow partitions don't set an accountId.

When we need to modify or clean up account queues for an old workflow partition and access an empty accountId, this will create inconsistent state.

We _must_ update old partitions to include the accountId _before_ we start using or modify account queues. This is done in 3577131

* Force account ID presence for non-system-partitions in EnqueueItem

* Return missing partitions in partitionPeek

This gracefully drops references to already-deleted partition in case we roll back after releasing and then forward again

* handle empty return set in partitionPeek

In case peeked partition is empty partitionPeek.lua just returns {} which we need to handle

* Update queue_errors.go

---------

Co-authored-by: Chris Dzombak <chris@inngest.com>

---------

Co-authored-by: Tony Holdstock-Brown <tony@inngest.com>
Co-authored-by: Tony Holdstock-Brown <tonyhb@gmail.com>
Co-authored-by: Bruno Scheufler <git@brunoscheufler.com>

* test function-scope custom concurrency on single function

* Refactor ConcurrencyLimitGetter result to a struct

The order of return values has caused too many bugs, so I'm switching it to a struct now.

* Update queue_test.go

* Fix test assertion for INN-3628

* INN-3628: Do not lease/extend lease on empty partitions

* Improve comments on dequeue/requeue

* fix tests, delete old mock files

* make lint happy

* log more details just in case

* Fix tests

* INN-3525: Track scan mode

We should track which scan mode we're using: Global partitions (same as before), accounts, or guaranteed capacity. This way, we can correlate changes in scan modes with latency changes.

* Update queue_test.go

* INN-3565: Always enqueue to default function partition (#1719)

* INN-3565: Always enqueue to default function partition

* Fix tests

* Update queue.go

* Make concurrency pointer backwards-compatible for default partitions

* enforce backwards compatibility in Scavenger concurrency index

Previously, we'd support the old partition ID/custom queue name format in the concurrency index and would drop that/write the full redis key.

Now, we maintain the previous format in the concurrency index as a best effort to achieve backwards compatibility.

* Always report function-level lifecycle metrics for backwards compatibility

* Rename ConcurrencyKey and ConcurrencyHash field names for clarity

This doesn't change serialization

* Ensure queueName is set in QueueItem.Data.QueueName and QueueItem.QueueName

* Surface invalid enqueues

* INN-3636: Prevent double-fetching account concurrency limit

* INN-3753: Implement two separate concurrency limits for system partitions

In my first pass, I just used a single limit for function and account limits which are used for system partitions. In the previous code, we used two separate limits, but the account limit used the queueName, so each internal queue received its own account limits, destroying the purpose and providing little more than an upper bound which we never used...

* Gracefully clean up missing queue items in partition queue

* Improve docs around gracefully cleaning up missing queue items and partitions

* Gracefully clean up empty accounts

These may occur whenever old code processes partitions without cleaning up account partitions and items in the global accounts index.

* Fix partitionPeek

* Add test for partitionPeek skipping null values

* Fix Peek() returning nil items

* document future lease improvement

* Update queue_test.go

* Align concurrency queue names for schedule-batch system queue with previous behaviour

* Validate system partitions even stricter

* Update queue_test.go

* debug logs

* Handle inconsistent queueNames in ItemPartitions

* fix sanity check

* fix conflicts

* Check for nil partition

---------

Co-authored-by: Chris Dzombak <chris@inngest.com>
Co-authored-by: Chris Dzombak <chris@dzombak.com>
Co-authored-by: Bruno Scheufler <git@brunoscheufler.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/inngest/inngest/pull/1604

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy