API Reference¶
Tip
PGMQueue¶
The PGMQueue class provides a high-level interface for managing PGMQ queues. It handles session management internally.
- class pgmq_sqlalchemy.PGMQueue(dsn: str | None = None, engine: Engine | AsyncEngine | None = None, session_maker: sessionmaker | None = None)¶
- __init__(dsn: str | None = None, engine: Engine | AsyncEngine | None = None, session_maker: sessionmaker | None = None) None¶
- There are 3 ways to initialize
PGMQueueclass:1. Initialize with adsn:from pgmq_sqlalchemy import PGMQueue pgmq_client = PGMQueue(dsn='postgresql+psycopg://postgres:postgres@localhost:5432/postgres') # or async dsn async_pgmq_client = PGMQueue(dsn='postgresql+asyncpg://postgres:postgres@localhost:5432/postgres')
2. Initialize with anengineorasync_engine:from pgmq_sqlalchemy import PGMQueue from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import create_async_engine engine = create_engine('postgresql+psycopg://postgres:postgres@localhost:5432/postgres') pgmq_client = PGMQueue(engine=engine) # or async engine async_engine = create_async_engine('postgresql+asyncpg://postgres:postgres@localhost:5432/postgres') async_pgmq_client = PGMQueue(engine=async_engine)
3. Initialize with asession_maker:from pgmq_sqlalchemy import PGMQueue from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession engine = create_engine('postgresql+psycopg://postgres:postgres@localhost:5432/postgres') session_maker = sessionmaker(bind=engine) pgmq_client = PGMQueue(session_maker=session_maker) # or async session_maker async_engine = create_async_engine('postgresql+asyncpg://postgres:postgres@localhost:5432/post async_session_maker = sessionmaker(bind=async_engine, class_=AsyncSession) async_pgmq_client = PGMQueue(session_maker=async_session_maker)
- Parameters:
dsn (Optional[str]) – Database connection string.
engine (Optional[ENGINE_TYPE]) – SQLAlchemy engine (sync or async).
session_maker (Optional[sessionmaker]) – SQLAlchemy session maker.
Note
PGMQueuewill auto create thepgmqextension ( andpg_partmanextension if the method is related with partitioned_queue ) if it does not exist in the Postgres.But you must make sure that thepgmqextension ( orpg_partmanextension ) already installed in the Postgres.
- create_queue(queue_name: str, unlogged: bool = False, *, session: Session | AsyncSession | None = None, commit: bool = True) None¶
Create a new queue.
if
unloggedisTrue, the queue will be created as an UNLOGGED TABLE .queue_namemust be less than 48 characters.pgmq_client.create_queue('my_queue') # or unlogged table queue pgmq_client.create_queue('my_queue', unlogged=True)
- async create_queue_async(queue_name: str, unlogged: bool = False, *, session: Session | AsyncSession | None = None, commit: bool = True) None¶
Create a new queue.
if
unloggedisTrue, the queue will be created as an UNLOGGED TABLE .queue_namemust be less than 48 characters.await pgmq_client.create_queue_async('my_queue') # or unlogged table queue await pgmq_client.create_queue_async('my_queue', unlogged=True)
- create_partitioned_queue(queue_name: str, partition_interval: int = 10000, retention_interval: int = 100000, *, session: Session | AsyncSession | None = None, commit: bool = True) None¶
Create a new partitioned queue.
# Numeric partitioning (by msg_id) pgmq_client.create_partitioned_queue('my_partitioned_queue', partition_interval=10000, retention_interval=100000) # Time-based partitioning (by enqueued_at) pgmq_client.create_partitioned_queue('my_time_queue', partition_interval='1 day', retention_interval='7 days')
- Parameters:
queue_name (str) – The name of the queue, should be less than 48 characters.
partition_interval (Union[int, str]) – For numeric partitioning, the number of messages per partition. For time-based partitioning, a PostgreSQL interval string (e.g., ‘1 day’, ‘1 hour’).
retention_interval (Union[int, str]) –
- For numeric partitioning, messages with msg_id less than max(msg_id) - retention_interval will be dropped.
For time-based partitioning, a PostgreSQL interval string (e.g., ‘7 days’).
Note
Supports both numeric (bymsg_id) and time-based (byenqueued_at) partitioning.For time-based partitioning, use interval strings like ‘1 day’, ‘1 hour’, ‘7 days’, etc.For numeric partitioning, use integer values.
Important
You must make sure that thepg_partmanextension already installed in the Postgres.pgmq-sqlalchemywill auto create thepg_partmanextension if it does not exist in the Postgres.
- async create_partitioned_queue_async(queue_name: str, partition_interval: int = 10000, retention_interval: int = 100000, *, session: Session | AsyncSession | None = None, commit: bool = True) None¶
Create a new partitioned queue.
# Numeric partitioning (by msg_id) await pgmq_client.create_partitioned_queue_async('my_partitioned_queue', partition_interval=10000, retention_interval=100000) # Time-based partitioning (by enqueued_at) await pgmq_client.create_partitioned_queue_async('my_time_queue', partition_interval='1 day', retention_interval='7 days')
- Parameters:
queue_name (str) – The name of the queue, should be less than 48 characters.
partition_interval (Union[int, str]) – For numeric partitioning, the number of messages per partition. For time-based partitioning, a PostgreSQL interval string (e.g., ‘1 day’, ‘1 hour’).
retention_interval (Union[int, str]) –
- For numeric partitioning, messages with msg_id less than max(msg_id) - retention_interval will be dropped.
For time-based partitioning, a PostgreSQL interval string (e.g., ‘7 days’).
Note
Supports both numeric (bymsg_id) and time-based (byenqueued_at) partitioning.For time-based partitioning, use interval strings like ‘1 day’, ‘1 hour’, ‘7 days’, etc.For numeric partitioning, use integer values.
Important
You must make sure that thepg_partmanextension already installed in the Postgres.pgmq-sqlalchemywill auto create thepg_partmanextension if it does not exist in the Postgres.
- validate_queue_name(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) None¶
Will raise an error if the
queue_nameis more than 48 characters.
- async validate_queue_name_async(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) None¶
Will raise an error if the
queue_nameis more than 48 characters.
- drop_queue(queue: str, partitioned: bool = False, *, session: Session | AsyncSession | None = None, commit: bool = True) bool¶
Drop a queue.
pgmq_client.drop_queue('my_queue') # for partitioned queue pgmq_client.drop_queue('my_partitioned_queue', partitioned=True)
Warning
All messages and queue itself will be deleted. (pgmq.q_<queue_name>table)Archived tables (pgmq.a_<queue_name>table will be dropped as well. )Seearchive()for more details.
- async drop_queue_async(queue: str, partitioned: bool = False, *, session: Session | AsyncSession | None = None, commit: bool = True) bool¶
Drop a queue.
await pgmq_client.drop_queue_async('my_queue') # for partitioned queue await pgmq_client.drop_queue_async('my_partitioned_queue', partitioned=True)
Warning
All messages and queue itself will be deleted. (pgmq.q_<queue_name>table)Archived tables (pgmq.a_<queue_name>table will be dropped as well. )Seearchive()for more details.
- list_queues(*, session: Session | AsyncSession | None = None, commit: bool = True) List[str]¶
List all queues.
queue_list = pgmq_client.list_queues() print(queue_list)
- async list_queues_async(*, session: Session | AsyncSession | None = None, commit: bool = True) List[str]¶
List all queues.
queue_list = await pgmq_client.list_queues_async() print(queue_list)
- send(queue_name: str, message: dict, delay: int = 0, *, session: Session | AsyncSession | None = None, commit: bool = True) int¶
Send a message to a queue.
msg_id = pgmq_client.send('my_queue', {'key': 'value', 'key2': 'value2'}) print(msg_id)
Example with delay:
msg_id = pgmq_client.send('my_queue', {'key': 'value', 'key2': 'value2'}, delay=10) msg = pgmq_client.read('my_queue') assert msg is None time.sleep(10) msg = pgmq_client.read('my_queue') assert msg is not None
- async send_async(queue_name: str, message: dict, delay: int = 0, *, session: Session | AsyncSession | None = None, commit: bool = True) int¶
Send a message to a queue.
msg_id = await pgmq_client.send_async('my_queue', {'key': 'value', 'key2': 'value2'}) print(msg_id)
Example with delay:
msg_id = await pgmq_client.send_async('my_queue', {'key': 'value', 'key2': 'value2'}, delay=10) msg = await pgmq_client.read_async('my_queue') assert msg is None await asyncio.sleep(10) msg = await pgmq_client.read_async('my_queue') assert msg is not None
- send_batch(queue_name: str, messages: List[dict], delay: int = 0, *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]¶
Send a batch of messages to a queue.
msgs = [{'key': 'value', 'key2': 'value2'}, {'key': 'value', 'key2': 'value2'}] msg_ids = pgmq_client.send_batch('my_queue', msgs) print(msg_ids) # send with delay msg_ids = pgmq_client.send_batch('my_queue', msgs, delay=10)
- async send_batch_async(queue_name: str, messages: List[dict], delay: int = 0, *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]¶
Send a batch of messages to a queue.
msgs = [{'key': 'value', 'key2': 'value2'}, {'key': 'value', 'key2': 'value2'}] msg_ids = await pgmq_client.send_batch_async('my_queue', msgs) print(msg_ids) # send with delay msg_ids = await pgmq_client.send_batch_async('my_queue', msgs, delay=10)
- read(queue_name: str, vt: int | None = None, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None¶
Read a message from the queue.
- Returns:
MessageorNoneif the queue is empty.
Note
PGMQuse FOR UPDATE SKIP LOCKED lock to make sure a message is only read by one consumer.See the pgmq.read function for more details.For consumer retries mechanism (e.g. mark a message as failed after a certain number of retries) can be implemented by using theread_ctfield in theMessageobject.Important
vtis the visibility timeout in seconds.When a message is read from the queue, it will be invisible to other consumers for the duration of thevt.Usage:
from pgmq_sqlalchemy.schema import Message msg:Message = pgmq_client.read('my_queue') print(msg.msg_id) print(msg.message) print(msg.read_ct) # read count, how many times the message has been read
Example with
vt:# assert `read_vt_demo` is empty pgmq_client.send('read_vt_demo', {'key': 'value', 'key2': 'value2'}) msg = pgmq_client.read('read_vt_demo', vt=10) assert msg is not None # try to read immediately msg = pgmq_client.read('read_vt_demo') assert msg is None # will return None because the message is still invisible # try to read after 5 seconds time.sleep(5) msg = pgmq_client.read('read_vt_demo') assert msg is None # still invisible after 5 seconds # try to read after 11 seconds time.sleep(6) msg = pgmq_client.read('read_vt_demo') assert msg is not None # the message is visible after 10 seconds
- async read_async(queue_name: str, vt: int | None = None, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None¶
Read a message from the queue.
- Returns:
MessageorNoneif the queue is empty.
Note
PGMQuse FOR UPDATE SKIP LOCKED lock to make sure a message is only read by one consumer.See the pgmq.read function for more details.For consumer retries mechanism (e.g. mark a message as failed after a certain number of retries) can be implemented by using theread_ctfield in theMessageobject.Important
vtis the visibility timeout in seconds.When a message is read from the queue, it will be invisible to other consumers for the duration of thevt.Usage:
from pgmq_sqlalchemy.schema import Message msg:Message = await pgmq_client.read_async('my_queue') print(msg.msg_id) print(msg.message) print(msg.read_ct) # read count, how many times the message has been read
Example with
vt:# assert `read_vt_demo` is empty await pgmq_client.send_async('read_vt_demo', {'key': 'value', 'key2': 'value2'}) msg = await pgmq_client.read_async('read_vt_demo', vt=10) assert msg is not None # try to read immediately msg = await pgmq_client.read_async('read_vt_demo') assert msg is None # will return None because the message is still invisible # try to read after 5 seconds await asyncio.sleep(5) msg = await pgmq_client.read_async('read_vt_demo') assert msg is None # still invisible after 5 seconds # try to read after 11 seconds await asyncio.sleep(6) msg = await pgmq_client.read_async('read_vt_demo') assert msg is not None # the message is visible after 10 seconds
- read_batch(queue_name: str, batch_size: int = 1, vt: int | None = None, *, session: Session | AsyncSession | None = None, commit: bool = True) List[Message] | None¶
- Read a batch of messages from the queue.Usage:
- Returns:
List of
MessageorNoneif the queue is empty.
from pgmq_sqlalchemy.schema import Message msgs:List[Message] = pgmq_client.read_batch('my_queue', batch_size=10) # with vt msgs:List[Message] = pgmq_client.read_batch('my_queue', batch_size=10, vt=10)
- async read_batch_async(queue_name: str, batch_size: int = 1, vt: int | None = None, *, session: Session | AsyncSession | None = None, commit: bool = True) List[Message] | None¶
- Read a batch of messages from the queue.Usage:
- Returns:
List of
MessageorNoneif the queue is empty.
from pgmq_sqlalchemy.schema import Message msgs:List[Message] = await pgmq_client.read_batch_async('my_queue', batch_size=10) # with vt msgs:List[Message] = await pgmq_client.read_batch_async('my_queue', batch_size=10, vt=10)
- read_with_poll(queue_name: str, vt: int | None = None, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, *, session: Session | AsyncSession | None = None, commit: bool = True) List[Message] | None¶
- Read messages from a queue with long-polling.When the queue is empty, the function block at most
max_poll_secondsseconds.During the polling, the function will check the queue everypoll_interval_msmilliseconds, until the queue hasqtymessages.- Parameters:
queue_name (str) – The name of the queue.
vt (Optional[int]) – The visibility timeout in seconds.
qty (int) – The number of messages to read.
max_poll_seconds (int) – The maximum number of seconds to poll.
poll_interval_ms (int) – The interval in milliseconds to poll.
- Returns:
List of
MessageorNoneif the queue is empty.
Usage:
msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=6) # the following code will block for 5 seconds msgs = pgmq_client.read_with_poll('my_queue', qty=1, max_poll_seconds=5, poll_interval_ms=100) assert msgs is None # try read_with_poll again # the following code will only block for 1 second msgs = pgmq_client.read_with_poll('my_queue', qty=1, max_poll_seconds=5, poll_interval_ms=100) assert msgs is not None
Another example:
msg = {'key': 'value'} msg_ids = pgmq_client.send_batch('my_queue', [msg, msg, msg, msg], delay=3) # the following code will block for 3 seconds msgs = pgmq_client.read_with_poll('my_queue', qty=3, max_poll_seconds=5, poll_interval_ms=100) assert len(msgs) == 3 # will read at most 3 messages (qty=3)
- async read_with_poll_async(queue_name: str, vt: int | None = None, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, *, session: Session | AsyncSession | None = None, commit: bool = True) List[Message] | None¶
- Read messages from a queue with long-polling.When the queue is empty, the function block at most
max_poll_secondsseconds.During the polling, the function will check the queue everypoll_interval_msmilliseconds, until the queue hasqtymessages.- Parameters:
queue_name (str) – The name of the queue.
vt (Optional[int]) – The visibility timeout in seconds.
qty (int) – The number of messages to read.
max_poll_seconds (int) – The maximum number of seconds to poll.
poll_interval_ms (int) – The interval in milliseconds to poll.
- Returns:
List of
MessageorNoneif the queue is empty.
Usage:
msg_id = await pgmq_client.send_async('my_queue', {'key': 'value'}, delay=6) # the following code will block for 5 seconds msgs = await pgmq_client.read_with_poll_async('my_queue', qty=1, max_poll_seconds=5, poll_interval_ms=100) assert msgs is None # try read_with_poll again # the following code will only block for 1 second msgs = await pgmq_client.read_with_poll_async('my_queue', qty=1, max_poll_seconds=5, poll_interval_ms=100) assert msgs is not None
Another example:
msg = {'key': 'value'} msg_ids = await pgmq_client.send_batch_async('my_queue', [msg, msg, msg, msg], delay=3) # the following code will block for 3 seconds msgs = await pgmq_client.read_with_poll_async('my_queue', qty=3, max_poll_seconds=5, poll_interval_ms=100) assert len(msgs) == 3 # will read at most 3 messages (qty=3)
- set_vt(queue_name: str, msg_id: int, vt: int, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None¶
Set the visibility timeout for a message.
- Parameters:
queue_name (str) – The name of the queue.
msg_id (int) – The message id.
vt (int) – The visibility timeout in seconds.
- Returns:
MessageorNoneif the message does not exist.
Usage:
msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=10) msg = pgmq_client.read('my_queue') assert msg is not None msg = pgmq_client.set_vt('my_queue', msg.msg_id, 10) assert msg is not None
Tip
For example:from pgmq_sqlalchemy import PGMQueue from pgmq_sqlalchemy.schema import Message def _exp_backoff_retry(msg: Message)->int: # exponential backoff retry if msg.read_ct < 5: return 2 ** msg.read_ct return 2 ** 5 def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str): msg = pgmq_client.read( queue_name=queue_name, vt=1000, # set vt to 1000 seconds temporarily ) if msg is None: return # set exponential backoff retry pgmq_client.set_vt( queue_name=query_name, msg_id=msg.msg_id, vt=_exp_backoff_retry(msg) )
- async set_vt_async(queue_name: str, msg_id: int, vt: int, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None¶
Set the visibility timeout for a message.
- Parameters:
queue_name (str) – The name of the queue.
msg_id (int) – The message id.
vt (int) – The visibility timeout in seconds.
- Returns:
MessageorNoneif the message does not exist.
Usage:
msg_id = await pgmq_client.send_async('my_queue', {'key': 'value'}, delay=10) msg = await pgmq_client.read_async('my_queue') assert msg is not None msg = await pgmq_client.set_vt_async('my_queue', msg.msg_id, 10) assert msg is not None
Tip
For example:from pgmq_sqlalchemy import PGMQueue from pgmq_sqlalchemy.schema import Message def _exp_backoff_retry(msg: Message)->int: # exponential backoff retry if msg.read_ct < 5: return 2 ** msg.read_ct return 2 ** 5 def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str): msg = await pgmq_client.read_async( queue_name=queue_name, vt=1000, # set vt to 1000 seconds temporarily ) if msg is None: return # set exponential backoff retry await pgmq_client.set_vt_async( queue_name=query_name, msg_id=msg.msg_id, vt=_exp_backoff_retry(msg) )
- pop(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None¶
Reads a single message from a queue and deletes it upon read.
msg = pgmq_client.pop('my_queue') print(msg.msg_id) print(msg.message)
- async pop_async(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None¶
Reads a single message from a queue and deletes it upon read.
msg = await pgmq_client.pop_async('my_queue') print(msg.msg_id) print(msg.message)
- delete(queue_name: str, msg_id: int, *, session: Session | AsyncSession | None = None, commit: bool = True) bool¶
Delete a message from the queue.
Raises an error if the
queue_namedoes not exist.Returns
Trueif the message is deleted successfully.If the message does not exist, returns
False.
msg_id = pgmq_client.send('my_queue', {'key': 'value'}) assert pgmq_client.delete('my_queue', msg_id) assert not pgmq_client.delete('my_queue', msg_id)
- async delete_async(queue_name: str, msg_id: int, *, session: Session | AsyncSession | None = None, commit: bool = True) bool¶
Delete a message from the queue.
Raises an error if the
queue_namedoes not exist.Returns
Trueif the message is deleted successfully.If the message does not exist, returns
False.
msg_id = await pgmq_client.send_async('my_queue', {'key': 'value'}) assert await pgmq_client.delete_async('my_queue', msg_id) assert not await pgmq_client.delete_async('my_queue', msg_id)
- delete_batch(queue_name: str, msg_ids: List[int], *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]¶
Delete a batch of messages from the queue.
Note
Instead of return bool likedelete(),msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}]) assert pgmq_client.delete_batch('my_queue', msg_ids) == msg_ids
- async delete_batch_async(queue_name: str, msg_ids: List[int], *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]¶
Delete a batch of messages from the queue.
Note
Instead of return bool likedelete(),msg_ids = await pgmq_client.send_batch_async('my_queue', [{'key': 'value'}, {'key': 'value'}]) assert await pgmq_client.delete_batch_async('my_queue', msg_ids) == msg_ids
- archive(queue_name: str, msg_id: int, *, session: Session | AsyncSession | None = None, commit: bool = True) bool¶
Archive a message from a queue.
- Message will be deleted from the queue and moved to the archive table.
Will be deleted from
pgmq.q_<queue_name>and be inserted into thepgmq.a_<queue_name>table.
raises an error if the
queue_namedoes not exist.returns
Trueif the message is archived successfully.
msg_id = pgmq_client.send('my_queue', {'key': 'value'}) assert pgmq_client.archive('my_queue', msg_id) # since the message is archived, queue will be empty assert pgmq_client.read('my_queue') is None
- async archive_async(queue_name: str, msg_id: int, *, session: Session | AsyncSession | None = None, commit: bool = True) bool¶
Archive a message from a queue.
- Message will be deleted from the queue and moved to the archive table.
Will be deleted from
pgmq.q_<queue_name>and be inserted into thepgmq.a_<queue_name>table.
raises an error if the
queue_namedoes not exist.returns
Trueif the message is archived successfully.
msg_id = await pgmq_client.send_async('my_queue', {'key': 'value'}) assert await pgmq_client.archive_async('my_queue', msg_id) # since the message is archived, queue will be empty assert await pgmq_client.read_async('my_queue') is None
- archive_batch(queue_name: str, msg_ids: List[int], *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]¶
Archive multiple messages from a queue.
Messages will be deleted from the queue and moved to the archive table.
Returns a list of
msg_idthat are successfully archived.
msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}]) assert pgmq_client.archive_batch('my_queue', msg_ids) == msg_ids assert pgmq_client.read('my_queue') is None
- async archive_batch_async(queue_name: str, msg_ids: List[int], *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]¶
Archive multiple messages from a queue.
Messages will be deleted from the queue and moved to the archive table.
Returns a list of
msg_idthat are successfully archived.
msg_ids = await pgmq_client.send_batch_async('my_queue', [{'key': 'value'}, {'key': 'value'}]) assert await pgmq_client.archive_batch_async('my_queue', msg_ids) == msg_ids assert await pgmq_client.read_async('my_queue') is None
- purge(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) int¶
Delete all messages from a queue, return the number of messages deleted.
Archive tables will not be affected.
msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}]) assert pgmq_client.purge('my_queue') == 2 assert pgmq_client.read('my_queue') is None
- async purge_async(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) int¶
Delete all messages from a queue, return the number of messages deleted.
Archive tables will not be affected.
msg_ids = await pgmq_client.send_batch_async('my_queue', [{'key': 'value'}, {'key': 'value'}]) assert await pgmq_client.purge_async('my_queue') == 2 assert await pgmq_client.read_async('my_queue') is None
- metrics(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) QueueMetrics | None¶
Get metrics for a queue.
- Returns:
QueueMetricsorNoneif the queue does not exist.
Usage:
from pgmq_sqlalchemy.schema import QueueMetrics metrics:QueueMetrics = pgmq_client.metrics('my_queue') print(metrics.queue_name) print(metrics.queue_length) print(metrics.queue_length)
- async metrics_async(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) QueueMetrics | None¶
Get metrics for a queue.
- Returns:
QueueMetricsorNoneif the queue does not exist.
Usage:
from pgmq_sqlalchemy.schema import QueueMetrics metrics:QueueMetrics = await pgmq_client.metrics_async('my_queue') print(metrics.queue_name) print(metrics.queue_length) print(metrics.queue_length)
- metrics_all(*, session: Session | AsyncSession | None = None, commit: bool = True) List[QueueMetrics] | None¶
Get metrics for all queues.
- Returns:
List of
QueueMetricsorNoneif there are no queues.
Usage:
from pgmq_sqlalchemy.schema import QueueMetrics metrics:List[QueueMetrics] = pgmq_client.metrics_all() for m in metrics: print(m.queue_name) print(m.queue_length) print(m.queue_length)
Warning
You should use a distributed lock to avoid race conditions when callingmetrics_all()in concurrentdrop_queue()scenarios.Since the default PostgreSQL isolation level is READ COMMITTED, the queue metrics to be fetched may not exist if there are concurrentdrop_queue()operations.Check the pgmq.metrics_all function for more details.
- async metrics_all_async(*, session: Session | AsyncSession | None = None, commit: bool = True) List[QueueMetrics] | None¶
Get metrics for all queues.
- Returns:
List of
QueueMetricsorNoneif there are no queues.
Usage:
from pgmq_sqlalchemy.schema import QueueMetrics metrics:List[QueueMetrics] = await pgmq_client.metrics_all_async() for m in metrics: print(m.queue_name) print(m.queue_length) print(m.queue_length)
Warning
You should use a distributed lock to avoid race conditions when callingmetrics_all()in concurrentdrop_queue()scenarios.Since the default PostgreSQL isolation level is READ COMMITTED, the queue metrics to be fetched may not exist if there are concurrentdrop_queue()operations.Check the pgmq.metrics_all function for more details.
PGMQOperation (op)¶
The PGMQOperation class (aliased as op) provides static methods for transaction-friendly operations.
All methods require a session to be passed in, giving you full control over transaction boundaries.
This is useful when you need to combine PGMQ operations with your existing business logic within the same transaction.
- class pgmq_sqlalchemy.operation.PGMQOperation¶
Static operations for PGMQ that accept user-provided sessions.
All methods are static and require a session to be passed in. Users are responsible for session management and transaction handling.
- static check_pgmq_ext(*, session: Session, commit: bool = True) None¶
Check if pgmq extension exists and create it if not.
- Parameters:
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- async static check_pgmq_ext_async(*, session: AsyncSession, commit: bool = True) None¶
Check if pgmq extension exists and create it if not (async).
- Parameters:
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- static check_pg_partman_ext(*, session: Session, commit: bool = True) None¶
Check if pg_partman extension exists and create it if not.
- Parameters:
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- async static check_pg_partman_ext_async(*, session: AsyncSession, commit: bool = True) None¶
Check if pg_partman extension exists and create it if not (async).
- Parameters:
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- static create_queue(queue_name: str, unlogged: bool = False, *, session: Session, commit: bool = True) None¶
Create a new queue.
- Parameters:
queue_name – The name of the queue.
unlogged – If True, creates an unlogged table.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- async static create_queue_async(queue_name: str, unlogged: bool = False, *, session: AsyncSession, commit: bool = True) None¶
Create a new queue asynchronously.
- Parameters:
queue_name – The name of the queue.
unlogged – If True, creates an unlogged table.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- static create_partitioned_queue(queue_name: str, partition_interval: str, retention_interval: str, *, session: Session, commit: bool = True) None¶
Create a new partitioned queue.
- Parameters:
queue_name – The name of the queue.
partition_interval – Partition interval as string.
retention_interval – Retention interval as string.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- async static create_partitioned_queue_async(queue_name: str, partition_interval: str, retention_interval: str, *, session: AsyncSession, commit: bool = True) None¶
Create a new partitioned queue asynchronously.
- Parameters:
queue_name – The name of the queue.
partition_interval – Partition interval as string.
retention_interval – Retention interval as string.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- static validate_queue_name(queue_name: str, *, session: Session, commit: bool = True) None¶
Validate the length of a queue name.
- Parameters:
queue_name – The name of the queue.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- async static validate_queue_name_async(queue_name: str, *, session: AsyncSession, commit: bool = True) None¶
Validate the length of a queue name asynchronously.
- Parameters:
queue_name – The name of the queue.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- static drop_queue(queue: str, partitioned: bool = False, *, session: Session, commit: bool = True) bool¶
Drop a queue.
- Parameters:
queue – The name of the queue.
partitioned – Whether the queue is partitioned.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
True if the queue was dropped successfully.
- async static drop_queue_async(queue: str, partitioned: bool = False, *, session: AsyncSession, commit: bool = True) bool¶
Drop a queue asynchronously.
- Parameters:
queue – The name of the queue.
partitioned – Whether the queue is partitioned.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
True if the queue was dropped successfully.
- static list_queues(*, session: Session, commit: bool = True) List[str]¶
List all queues.
- Parameters:
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of queue names.
- async static list_queues_async(*, session: AsyncSession, commit: bool = True) List[str]¶
List all queues asynchronously.
- Parameters:
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of queue names.
- static send(queue_name: str, message: dict, delay: int = 0, *, session: Session, commit: bool = True) int¶
Send a message to a queue.
- Parameters:
queue_name – The name of the queue.
message – The message as a dictionary.
delay – Delay in seconds before the message becomes visible.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
The message ID.
- async static send_async(queue_name: str, message: dict, delay: int = 0, *, session: AsyncSession, commit: bool = True) int¶
Send a message to a queue asynchronously.
- Parameters:
queue_name – The name of the queue.
message – The message as a dictionary.
delay – Delay in seconds before the message becomes visible.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
The message ID.
- static send_batch(queue_name: str, messages: List[dict], delay: int = 0, *, session: Session, commit: bool = True) List[int]¶
Send a batch of messages to a queue.
- Parameters:
queue_name – The name of the queue.
messages – The messages as a list of dictionaries.
delay – Delay in seconds before the messages become visible.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of message IDs.
- async static send_batch_async(queue_name: str, messages: List[dict], delay: int = 0, *, session: AsyncSession, commit: bool = True) List[int]¶
Send a batch of messages to a queue asynchronously.
- Parameters:
queue_name – The name of the queue.
messages – The messages as a list of dictionaries.
delay – Delay in seconds before the messages become visible.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of message IDs.
- static read(queue_name: str, vt: int, *, session: Session, commit: bool = True) Message | None¶
Read a message from the queue.
- Parameters:
queue_name – The name of the queue.
vt – Visibility timeout in seconds.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
Message or None if the queue is empty.
- async static read_async(queue_name: str, vt: int, *, session: AsyncSession, commit: bool = True) Message | None¶
Read a message from the queue asynchronously.
- Parameters:
queue_name – The name of the queue.
vt – Visibility timeout in seconds.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
Message or None if the queue is empty.
- static read_batch(queue_name: str, vt: int, batch_size: int = 1, *, session: Session, commit: bool = True) List[Message] | None¶
Read a batch of messages from the queue.
- Parameters:
queue_name – The name of the queue.
vt – Visibility timeout in seconds.
batch_size – Number of messages to read.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of messages or None if the queue is empty.
- async static read_batch_async(queue_name: str, vt: int, batch_size: int = 1, *, session: AsyncSession, commit: bool = True) List[Message] | None¶
Read a batch of messages from the queue asynchronously.
- Parameters:
queue_name – The name of the queue.
vt – Visibility timeout in seconds.
batch_size – Number of messages to read.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of messages or None if the queue is empty.
- static read_with_poll(queue_name: str, vt: int, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, *, session: Session, commit: bool = True) List[Message] | None¶
Read messages from a queue with polling.
- Parameters:
queue_name – The name of the queue.
vt – Visibility timeout in seconds.
qty – Number of messages to read.
max_poll_seconds – Maximum number of seconds to poll.
poll_interval_ms – Interval in milliseconds to poll.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of messages or None if the queue is empty.
- async static read_with_poll_async(queue_name: str, vt: int, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, *, session: AsyncSession, commit: bool = True) List[Message] | None¶
Read messages from a queue with polling asynchronously.
- Parameters:
queue_name – The name of the queue.
vt – Visibility timeout in seconds.
qty – Number of messages to read.
max_poll_seconds – Maximum number of seconds to poll.
poll_interval_ms – Interval in milliseconds to poll.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of messages or None if the queue is empty.
- static set_vt(queue_name: str, msg_id: int, vt: int, *, session: Session, commit: bool = True) Message | None¶
Set the visibility timeout for a message.
- Parameters:
queue_name – The name of the queue.
msg_id – The message ID.
vt – Visibility timeout in seconds.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
Message or None if the message does not exist.
- async static set_vt_async(queue_name: str, msg_id: int, vt: int, *, session: AsyncSession, commit: bool = True) Message | None¶
Set the visibility timeout for a message asynchronously.
- Parameters:
queue_name – The name of the queue.
msg_id – The message ID.
vt – Visibility timeout in seconds.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
Message or None if the message does not exist.
- static pop(queue_name: str, *, session: Session, commit: bool = True) Message | None¶
Read and delete a message from the queue.
- Parameters:
queue_name – The name of the queue.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
Message or None if the queue is empty.
- async static pop_async(queue_name: str, *, session: AsyncSession, commit: bool = True) Message | None¶
Read and delete a message from the queue asynchronously.
- Parameters:
queue_name – The name of the queue.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
Message or None if the queue is empty.
- static delete(queue_name: str, msg_id: int, *, session: Session, commit: bool = True) bool¶
Delete a message from the queue.
- Parameters:
queue_name – The name of the queue.
msg_id – The message ID.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
True if the message was deleted successfully.
- async static delete_async(queue_name: str, msg_id: int, *, session: AsyncSession, commit: bool = True) bool¶
Delete a message from the queue asynchronously.
- Parameters:
queue_name – The name of the queue.
msg_id – The message ID.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
True if the message was deleted successfully.
- static delete_batch(queue_name: str, msg_ids: List[int], *, session: Session, commit: bool = True) List[int]¶
Delete a batch of messages from the queue.
- Parameters:
queue_name – The name of the queue.
msg_ids – List of message IDs.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of message IDs that were successfully deleted.
- async static delete_batch_async(queue_name: str, msg_ids: List[int], *, session: AsyncSession, commit: bool = True) List[int]¶
Delete a batch of messages from the queue asynchronously.
- Parameters:
queue_name – The name of the queue.
msg_ids – List of message IDs.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of message IDs that were successfully deleted.
- static archive(queue_name: str, msg_id: int, *, session: Session, commit: bool = True) bool¶
Archive a message from a queue.
- Parameters:
queue_name – The name of the queue.
msg_id – The message ID.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
True if the message was archived successfully.
- async static archive_async(queue_name: str, msg_id: int, *, session: AsyncSession, commit: bool = True) bool¶
Archive a message from a queue asynchronously.
- Parameters:
queue_name – The name of the queue.
msg_id – The message ID.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
True if the message was archived successfully.
- static archive_batch(queue_name: str, msg_ids: List[int], *, session: Session, commit: bool = True) List[int]¶
Archive a batch of messages from the queue.
- Parameters:
queue_name – The name of the queue.
msg_ids – List of message IDs.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of message IDs that were successfully archived.
- async static archive_batch_async(queue_name: str, msg_ids: List[int], *, session: AsyncSession, commit: bool = True) List[int]¶
Archive a batch of messages from the queue asynchronously.
- Parameters:
queue_name – The name of the queue.
msg_ids – List of message IDs.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of message IDs that were successfully archived.
- static purge(queue_name: str, *, session: Session, commit: bool = True) int¶
Purge all messages from a queue.
- Parameters:
queue_name – The name of the queue.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
Number of messages purged.
- async static purge_async(queue_name: str, *, session: AsyncSession, commit: bool = True) int¶
Purge all messages from a queue asynchronously.
- Parameters:
queue_name – The name of the queue.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
Number of messages purged.
- static metrics(queue_name: str, *, session: Session, commit: bool = True) QueueMetrics | None¶
Get metrics for a queue.
- Parameters:
queue_name – The name of the queue.
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
QueueMetrics or None if the queue does not exist.
- async static metrics_async(queue_name: str, *, session: AsyncSession, commit: bool = True) QueueMetrics | None¶
Get metrics for a queue asynchronously.
- Parameters:
queue_name – The name of the queue.
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
QueueMetrics or None if the queue does not exist.
- static metrics_all(*, session: Session, commit: bool = True) List[QueueMetrics] | None¶
Get metrics for all queues.
- Parameters:
session – SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of QueueMetrics or None if no queues exist.
- async static metrics_all_async(*, session: AsyncSession, commit: bool = True) List[QueueMetrics] | None¶
Get metrics for all queues asynchronously.
- Parameters:
session – Async SQLAlchemy session.
commit – Whether to commit the transaction.
- Returns:
List of QueueMetrics or None if no queues exist.