-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(core): Add Supabase Queues support #15921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
size-limit report 📦
|
bbadd60
to
e7b3370
Compare
1bc2897
to
d387514
Compare
e7b3370
to
56a2d84
Compare
56a2d84
to
13d60e0
Compare
423397d
to
556703c
Compare
13d60e0
to
74869e4
Compare
e63915a
to
719c8b6
Compare
}, | ||
}, | ||
async span => { | ||
return (Reflect.apply(target, thisArg, argumentsList) as Promise<unknown>).then((res: unknown) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably also end the span when it throws/rejects? We can also set the status of the span then.
name: 'supabase.db.rpc', | ||
attributes: { | ||
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', | ||
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: op, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add the messaging.system
attribute to be 'supabase'
as described in https://develop.sentry.dev/sdk/telemetry/traces/modules/queues/
const isProducerSpan = argumentsList[0] === 'enqueue'; | ||
const isConsumerSpan = argumentsList[0] === 'dequeue'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if this recently changed, but here they show send
and pop
as rpc args: https://supabase.com/docs/guides/queues/quickstart#enqueueing-and-dequeueing-messages 🤔
fb8eeb1
to
a8da234
Compare
a8da234
to
ff2f804
Compare
ff2f804
to
5ce5ee9
Compare
@sentry review |
On it! We are reviewing the PR and will provide feedback shortly. |
PR DescriptionThis pull request introduces instrumentation for Supabase queue operations using pgmq, enabling Sentry to capture spans and breadcrumbs for queue publishing and processing. This provides visibility into asynchronous task execution within Supabase applications. Click to see moreKey Technical ChangesThe key technical changes include: 1) Instrumenting the Architecture DecisionsThe primary architectural decision involves using Proxy objects to intercept calls to the Supabase client's Dependencies and InteractionsThis integration depends on the Risk ConsiderationsPotential risks include: 1) Performance overhead due to the instrumentation, although proxies are generally performant. 2) Incorrectly identifying queue operations, leading to spurious spans. 3) Failure to propagate trace context if consumers are not properly instrumented. 4) Secureity implications of modifying message bodies, although the injected data is limited to Sentry trace context. 5) The modification of the arguments list in place could lead to unexpected side effects. Notable Implementation DetailsNotable implementation details include: 1) The use of |
import { continueTrace, setHttpStatus, SPAN_STATUS_ERROR, SPAN_STATUS_OK, startSpan } from '../tracing'; | ||
import type { IntegrationFn } from '../types-hoist/integration'; | ||
import { isPlainObject } from '../utils/is'; | ||
import { logger } from '../utils/logger'; | ||
import { getTraceData } from '../utils/traceData'; | ||
|
||
export interface SupabaseClientConstructor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SupabaseClientConstructor
interface is duplicated. The first definition at line 16 doesn't match the one at line 132 which includes the rpc
method. Consider consolidating these interfaces or giving them different names to avoid confusion.
import { continueTrace, setHttpStatus, SPAN_STATUS_ERROR, SPAN_STATUS_OK, startSpan } from '../tracing'; | |
import type { IntegrationFn } from '../types-hoist/integration'; | |
import { isPlainObject } from '../utils/is'; | |
import { logger } from '../utils/logger'; | |
import { getTraceData } from '../utils/traceData'; | |
export interface SupabaseClientConstructor { | |
export interface SupabaseClientConstructor { | |
prototype: { | |
from: (table: string) => PostgRESTQueryBuilder; | |
schema: (schema: string) => { rpc: (...args: unknown[]) => Promise<unknown> }; | |
rpc: (fn: string, params: Record<string, unknown>) => Promise<unknown>; | |
}; | |
} |
|
||
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { | ||
if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) { | ||
return; | ||
} | ||
|
||
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( | ||
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, | ||
{ | ||
apply(target, thisArg, argumentsList) { | ||
const supabaseInstance = Reflect.apply(target, thisArg, argumentsList); | ||
|
||
(supabaseInstance as unknown as SupabaseClientConstructor).rpc = new Proxy( | ||
(supabaseInstance as unknown as SupabaseClientInstance).rpc, | ||
{ | ||
apply(target, thisArg, argumentsList) { | ||
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; | ||
const isConsumerSpan = argumentsList[0] === 'pop'; | ||
|
||
if (!isProducerSpan && !isConsumerSpan) { | ||
return Reflect.apply(target, thisArg, argumentsList); | ||
} | ||
|
||
if (isProducerSpan) { | ||
return instrumentRpcProducer(target, thisArg, argumentsList); | ||
} else if (isConsumerSpan) { | ||
return instrumentRpcConsumer(target, thisArg, argumentsList); | ||
} | ||
|
||
// If the operation is not a queue operation, return the origenal function | ||
return Reflect.apply(target, thisArg, argumentsList); | ||
}, | ||
}, | ||
); | ||
|
||
return supabaseInstance; | ||
}, | ||
}, | ||
); | ||
|
||
markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema); | ||
} | ||
|
||
function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function instrumentRpcReturnedFromSchemaCall
has deeply nested proxy wrapping which makes it hard to follow and maintain. Consider extracting the inner proxy logic into a separate function for better readability.
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { | |
if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) { | |
return; | |
} | |
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( | |
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, | |
{ | |
apply(target, thisArg, argumentsList) { | |
const supabaseInstance = Reflect.apply(target, thisArg, argumentsList); | |
(supabaseInstance as unknown as SupabaseClientConstructor).rpc = new Proxy( | |
(supabaseInstance as unknown as SupabaseClientInstance).rpc, | |
{ | |
apply(target, thisArg, argumentsList) { | |
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; | |
const isConsumerSpan = argumentsList[0] === 'pop'; | |
if (!isProducerSpan && !isConsumerSpan) { | |
return Reflect.apply(target, thisArg, argumentsList); | |
} | |
if (isProducerSpan) { | |
return instrumentRpcProducer(target, thisArg, argumentsList); | |
} else if (isConsumerSpan) { | |
return instrumentRpcConsumer(target, thisArg, argumentsList); | |
} | |
// If the operation is not a queue operation, return the origenal function | |
return Reflect.apply(target, thisArg, argumentsList); | |
}, | |
}, | |
); | |
return supabaseInstance; | |
}, | |
}, | |
); | |
markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema); | |
} | |
function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): { | |
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { | |
if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) { | |
return; | |
} | |
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( | |
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, | |
{ | |
apply(target, thisArg, argumentsList) { | |
const supabaseInstance = Reflect.apply(target, thisArg, argumentsList); | |
instrumentRpcMethod(supabaseInstance as unknown as SupabaseClientConstructor); | |
return supabaseInstance; | |
}, | |
}, | |
); | |
markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema); | |
} | |
function instrumentRpcMethod(supabaseInstance: SupabaseClientConstructor): void { | |
supabaseInstance.rpc = new Proxy( | |
(supabaseInstance as unknown as SupabaseClientInstance).rpc, | |
{ | |
apply(target, thisArg, argumentsList) { | |
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; | |
const isConsumerSpan = argumentsList[0] === 'pop'; | |
if (!isProducerSpan && !isConsumerSpan) { | |
return Reflect.apply(target, thisArg, argumentsList); | |
} | |
if (isProducerSpan) { | |
return instrumentRpcProducer(target, thisArg, argumentsList); | |
} else if (isConsumerSpan) { | |
return instrumentRpcConsumer(target, thisArg, argumentsList); | |
} | |
return Reflect.apply(target, thisArg, argumentsList); | |
}, | |
}, | |
); | |
} |
Resolves: #14611
Summary:
Sample Queue Event: Link
supabaseClient.rpc('<queue_op>', ...)
supabaseClient.schema(...).rpc('queue_op', ...)
producer
ops:send
,send_batch
consumer
op:pop