Content-Length: 397073 | pFad | http://github.com/tembo-io/pgmq/commit/f5c04a4f20305cba66907c6d7fafa7672d960b8b

26 Implement message headers (#338) · tembo-io/pgmq@f5c04a4 · GitHub
Skip to content

Commit

Permalink
Implement message headers (#338)
Browse files Browse the repository at this point in the history
* Implement headers

Implement message headers

* refactoring

* test fixes

* Archive headers

* Migration

* Fixes and upgrade script

* Use header in misc test

* More tests and bugfix
  • Loading branch information
v0idpwn authored Dec 10, 2024
1 parent 57d8a9b commit f5c04a4
Show file tree
Hide file tree
Showing 4 changed files with 384 additions and 57 deletions.
236 changes: 234 additions & 2 deletions pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-- Add conditional read
-- read
-- reads a number of messages from a queue, setting a visibility timeout on them
DROP FUNCTION IF EXISTS pgmq.read(TEXT, INTEGER, INTEGER);
Expand Down Expand Up @@ -32,7 +33,7 @@ BEGIN
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, m.headers;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);
Expand Down Expand Up @@ -84,7 +85,7 @@ BEGIN
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, headers;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);
Expand Down Expand Up @@ -191,6 +192,7 @@ BEGIN
END;
$$ LANGUAGE plpgsql;

-- Add queue visible length to metrics
ALTER TYPE pgmq.metrics_result ADD ATTRIBUTE queue_visible_length bigint;

DROP FUNCTION pgmq.metrics(queue_name TEXT);
Expand Down Expand Up @@ -235,3 +237,233 @@ BEGIN
RETURN result_row;
END;
$$ LANGUAGE plpgsql;

-- Headers
-- Update types
ALTER TYPE pgmq.message_record ADD ATTRIBUTE headers JSONB;

-- Update functions
DROP FUNCTION pgmq.send(TEXT, JSONB, INTEGER);
DROP FUNCTION pgmq.send_batch(TEXT, JSONB[], INTEGER);
DROP FUNCTION pgmq.archive(TEXT, BIGINT);
DROP FUNCTION pgmq.archive(TEXT, BIGINT[]);

-- send: 2 args, no delay or headers
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp());
$$ LANGUAGE sql;

-- send: 3 args with headers
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp());
$$ LANGUAGE sql;

-- send: 3 args with integer delay
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;

-- send: 3 args with timestamp
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, NULL, delay);
$$ LANGUAGE sql;

-- send: 4 args with integer delay
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB,
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;

-- send: actual implementation
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
VALUES ($2, $1, $3)
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msg, delay, headers;
END;
$$ LANGUAGE plpgsql;

-- send batch: 2 args
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[]
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp());
$$ LANGUAGE sql;

-- send batch: 3 args with headers
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[]
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp());
$$ LANGUAGE sql;

-- send batch: 3 args with integer delay
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;

-- send batch: 3 args with timestamp
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, delay);
$$ LANGUAGE sql;

-- send_batch: 4 args with integer delay
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;

-- send_batch: actual implementation
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[]))
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msgs, delay, headers;
END;
$$ LANGUAGE plpgsql;

-- archive
CREATE FUNCTION pgmq.archive(
queue_name TEXT,
msg_ids BIGINT[]
)
RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
atable TEXT := pgmq.format_table_name(queue_name, 'a');
BEGIN
sql := FORMAT(
$QUERY$
WITH archived AS (
DELETE FROM pgmq.%I
WHERE msg_id = ANY($1)
RETURNING msg_id, vt, read_ct, enqueued_at, message, headers
)
INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers)
SELECT msg_id, vt, read_ct, enqueued_at, message, headers
FROM archived
RETURNING msg_id;
$QUERY$,
qtable, atable
);
RETURN QUERY EXECUTE sql USING msg_ids;
END;
$$ LANGUAGE plpgsql;

-- archive
CREATE FUNCTION pgmq.archive(
queue_name TEXT,
msg_id BIGINT
)
RETURNS BOOLEAN AS $$
DECLARE
sql TEXT;
result BIGINT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
atable TEXT := pgmq.format_table_name(queue_name, 'a');
BEGIN
sql := FORMAT(
$QUERY$
WITH archived AS (
DELETE FROM pgmq.%I
WHERE msg_id = $1
RETURNING msg_id, vt, read_ct, enqueued_at, message, headers
)
INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers)
SELECT msg_id, vt, read_ct, enqueued_at, message, headers
FROM archived
RETURNING msg_id;
$QUERY$,
qtable, atable
);
EXECUTE sql USING msg_id INTO result;
RETURN NOT (result IS NULL);
END;
$$ LANGUAGE plpgsql;

-- Update existing queues
DO $$
DECLARE
queue_record RECORD;
qtable TEXT;
atable TEXT;
BEGIN
FOR queue_record IN SELECT queue_name FROM pgmq.meta LOOP
qtable := pgmq.format_table_name(queue_record.queue_name, 'q');
atable := pgmq.format_table_name(queue_record.queue_name, 'a');

EXECUTE format(
'ALTER TABLE pgmq.%I ADD COLUMN headers JSONB',
qtable
);

EXECUTE format(
'ALTER TABLE pgmq.%I ADD COLUMN headers JSONB',
atable
);
END LOOP;
END;
$$;
Loading

0 comments on commit f5c04a4

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/tembo-io/pgmq/commit/f5c04a4f20305cba66907c6d7fafa7672d960b8b

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy