API Reference

Tip

For a more detailed explanation or implementation of each PGMQ function,

PGMQueue

The PGMQueue class provides a high-level interface for managing PGMQ queues. It handles session management internally.

class pgmq_sqlalchemy.PGMQueue(dsn: str | None = None, engine: Engine | AsyncEngine | None = None, session_maker: sessionmaker | None = None)
__init__(dsn: str | None = None, engine: Engine | AsyncEngine | None = None, session_maker: sessionmaker | None = None) None
There are 3 ways to initialize PGMQueue class:
1. Initialize with a dsn:
from pgmq_sqlalchemy import PGMQueue

pgmq_client = PGMQueue(dsn='postgresql+psycopg://postgres:postgres@localhost:5432/postgres')
# or async dsn
async_pgmq_client = PGMQueue(dsn='postgresql+asyncpg://postgres:postgres@localhost:5432/postgres')
2. Initialize with an engine or async_engine:
from pgmq_sqlalchemy import PGMQueue
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_engine('postgresql+psycopg://postgres:postgres@localhost:5432/postgres')
pgmq_client = PGMQueue(engine=engine)
# or async engine
async_engine = create_async_engine('postgresql+asyncpg://postgres:postgres@localhost:5432/postgres')
async_pgmq_client = PGMQueue(engine=async_engine)
3. Initialize with a session_maker:
from pgmq_sqlalchemy import PGMQueue
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

engine = create_engine('postgresql+psycopg://postgres:postgres@localhost:5432/postgres')
session_maker = sessionmaker(bind=engine)
pgmq_client = PGMQueue(session_maker=session_maker)
# or async session_maker
async_engine = create_async_engine('postgresql+asyncpg://postgres:postgres@localhost:5432/post
async_session_maker = sessionmaker(bind=async_engine, class_=AsyncSession)
async_pgmq_client = PGMQueue(session_maker=async_session_maker)
Parameters:
  • dsn (Optional[str]) – Database connection string.

  • engine (Optional[ENGINE_TYPE]) – SQLAlchemy engine (sync or async).

  • session_maker (Optional[sessionmaker]) – SQLAlchemy session maker.

Note

PGMQueue will auto create the pgmq extension ( and pg_partman extension if the method is related with partitioned_queue ) if it does not exist in the Postgres.
But you must make sure that the pgmq extension ( or pg_partman extension ) already installed in the Postgres.
create_queue(queue_name: str, unlogged: bool = False, *, session: Session | AsyncSession | None = None, commit: bool = True) None

Create a new queue.

  • if unlogged is True, the queue will be created as an UNLOGGED TABLE .

  • queue_name must be less than 48 characters.

    pgmq_client.create_queue('my_queue')
    # or unlogged table queue
    pgmq_client.create_queue('my_queue', unlogged=True)
    
async create_queue_async(queue_name: str, unlogged: bool = False, *, session: Session | AsyncSession | None = None, commit: bool = True) None

Create a new queue.

  • if unlogged is True, the queue will be created as an UNLOGGED TABLE .

  • queue_name must be less than 48 characters.

    await pgmq_client.create_queue_async('my_queue')
    # or unlogged table queue
    await pgmq_client.create_queue_async('my_queue', unlogged=True)
    
create_partitioned_queue(queue_name: str, partition_interval: int = 10000, retention_interval: int = 100000, *, session: Session | AsyncSession | None = None, commit: bool = True) None

Create a new partitioned queue.

# Numeric partitioning (by msg_id)
pgmq_client.create_partitioned_queue('my_partitioned_queue', partition_interval=10000, retention_interval=100000)

# Time-based partitioning (by enqueued_at)
pgmq_client.create_partitioned_queue('my_time_queue', partition_interval='1 day', retention_interval='7 days')
Parameters:
  • queue_name (str) – The name of the queue, should be less than 48 characters.

  • partition_interval (Union[int, str]) – For numeric partitioning, the number of messages per partition. For time-based partitioning, a PostgreSQL interval string (e.g., ‘1 day’, ‘1 hour’).

  • retention_interval (Union[int, str]) –

    For numeric partitioning, messages with msg_id less than max(msg_id) - retention_interval will be dropped.

    For time-based partitioning, a PostgreSQL interval string (e.g., ‘7 days’).

    Note

    Supports both numeric (by msg_id) and time-based (by enqueued_at) partitioning.
    For time-based partitioning, use interval strings like ‘1 day’, ‘1 hour’, ‘7 days’, etc.
    For numeric partitioning, use integer values.

Important

You must make sure that the pg_partman extension already installed in the Postgres.
pgmq-sqlalchemy will auto create the pg_partman extension if it does not exist in the Postgres.
For more details about pgmq with pg_partman, checkout the PGMQ: Partitioned Queues.
async create_partitioned_queue_async(queue_name: str, partition_interval: int = 10000, retention_interval: int = 100000, *, session: Session | AsyncSession | None = None, commit: bool = True) None

Create a new partitioned queue.

# Numeric partitioning (by msg_id)
await pgmq_client.create_partitioned_queue_async('my_partitioned_queue', partition_interval=10000, retention_interval=100000)

# Time-based partitioning (by enqueued_at)
await pgmq_client.create_partitioned_queue_async('my_time_queue', partition_interval='1 day', retention_interval='7 days')
Parameters:
  • queue_name (str) – The name of the queue, should be less than 48 characters.

  • partition_interval (Union[int, str]) – For numeric partitioning, the number of messages per partition. For time-based partitioning, a PostgreSQL interval string (e.g., ‘1 day’, ‘1 hour’).

  • retention_interval (Union[int, str]) –

    For numeric partitioning, messages with msg_id less than max(msg_id) - retention_interval will be dropped.

    For time-based partitioning, a PostgreSQL interval string (e.g., ‘7 days’).

    Note

    Supports both numeric (by msg_id) and time-based (by enqueued_at) partitioning.
    For time-based partitioning, use interval strings like ‘1 day’, ‘1 hour’, ‘7 days’, etc.
    For numeric partitioning, use integer values.

Important

You must make sure that the pg_partman extension already installed in the Postgres.
pgmq-sqlalchemy will auto create the pg_partman extension if it does not exist in the Postgres.
For more details about pgmq with pg_partman, checkout the PGMQ: Partitioned Queues.
validate_queue_name(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) None
  • Will raise an error if the queue_name is more than 48 characters.

async validate_queue_name_async(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) None
  • Will raise an error if the queue_name is more than 48 characters.

drop_queue(queue: str, partitioned: bool = False, *, session: Session | AsyncSession | None = None, commit: bool = True) bool

Drop a queue.

pgmq_client.drop_queue('my_queue')
# for partitioned queue
pgmq_client.drop_queue('my_partitioned_queue', partitioned=True)

Warning

All messages and queue itself will be deleted. (pgmq.q_<queue_name> table)
Archived tables (pgmq.a_<queue_name> table will be dropped as well. )

See archive() for more details.
async drop_queue_async(queue: str, partitioned: bool = False, *, session: Session | AsyncSession | None = None, commit: bool = True) bool

Drop a queue.

await pgmq_client.drop_queue_async('my_queue')
# for partitioned queue
await pgmq_client.drop_queue_async('my_partitioned_queue', partitioned=True)

Warning

All messages and queue itself will be deleted. (pgmq.q_<queue_name> table)
Archived tables (pgmq.a_<queue_name> table will be dropped as well. )

See archive() for more details.
list_queues(*, session: Session | AsyncSession | None = None, commit: bool = True) List[str]

List all queues.

queue_list = pgmq_client.list_queues()
print(queue_list)
async list_queues_async(*, session: Session | AsyncSession | None = None, commit: bool = True) List[str]

List all queues.

queue_list = await pgmq_client.list_queues_async()
print(queue_list)
send(queue_name: str, message: dict, delay: int = 0, *, session: Session | AsyncSession | None = None, commit: bool = True) int

Send a message to a queue.

msg_id = pgmq_client.send('my_queue', {'key': 'value', 'key2': 'value2'})
print(msg_id)

Example with delay:

msg_id = pgmq_client.send('my_queue', {'key': 'value', 'key2': 'value2'}, delay=10)
msg = pgmq_client.read('my_queue')
assert msg is None
time.sleep(10)
msg = pgmq_client.read('my_queue')
assert msg is not None
async send_async(queue_name: str, message: dict, delay: int = 0, *, session: Session | AsyncSession | None = None, commit: bool = True) int

Send a message to a queue.

msg_id = await pgmq_client.send_async('my_queue', {'key': 'value', 'key2': 'value2'})
print(msg_id)

Example with delay:

msg_id = await pgmq_client.send_async('my_queue', {'key': 'value', 'key2': 'value2'}, delay=10)
msg = await pgmq_client.read_async('my_queue')
assert msg is None
await asyncio.sleep(10)
msg = await pgmq_client.read_async('my_queue')
assert msg is not None
send_batch(queue_name: str, messages: List[dict], delay: int = 0, *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]

Send a batch of messages to a queue.

msgs = [{'key': 'value', 'key2': 'value2'}, {'key': 'value', 'key2': 'value2'}]
msg_ids = pgmq_client.send_batch('my_queue', msgs)
print(msg_ids)
# send with delay
msg_ids = pgmq_client.send_batch('my_queue', msgs, delay=10)
async send_batch_async(queue_name: str, messages: List[dict], delay: int = 0, *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]

Send a batch of messages to a queue.

msgs = [{'key': 'value', 'key2': 'value2'}, {'key': 'value', 'key2': 'value2'}]
msg_ids = await pgmq_client.send_batch_async('my_queue', msgs)
print(msg_ids)
# send with delay
msg_ids = await pgmq_client.send_batch_async('my_queue', msgs, delay=10)
read(queue_name: str, vt: int | None = None, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None

Read a message from the queue.

Returns:

Message or None if the queue is empty.

Note

PGMQ use FOR UPDATE SKIP LOCKED lock to make sure a message is only read by one consumer.
See the pgmq.read function for more details.

For consumer retries mechanism (e.g. mark a message as failed after a certain number of retries) can be implemented by using the read_ct field in the Message object.

Important

vt is the visibility timeout in seconds.
When a message is read from the queue, it will be invisible to other consumers for the duration of the vt.

Usage:

from pgmq_sqlalchemy.schema import Message

msg:Message = pgmq_client.read('my_queue')
print(msg.msg_id)
print(msg.message)
print(msg.read_ct) # read count, how many times the message has been read

Example with vt:

# assert `read_vt_demo` is empty
pgmq_client.send('read_vt_demo', {'key': 'value', 'key2': 'value2'})
msg = pgmq_client.read('read_vt_demo', vt=10)
assert msg is not None

# try to read immediately
msg = pgmq_client.read('read_vt_demo')
assert msg is None # will return None because the message is still invisible

# try to read after 5 seconds
time.sleep(5)
msg = pgmq_client.read('read_vt_demo')
assert msg is None # still invisible after 5 seconds

 # try to read after 11 seconds
time.sleep(6)
msg = pgmq_client.read('read_vt_demo')
assert msg is not None # the message is visible after 10 seconds
async read_async(queue_name: str, vt: int | None = None, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None

Read a message from the queue.

Returns:

Message or None if the queue is empty.

Note

PGMQ use FOR UPDATE SKIP LOCKED lock to make sure a message is only read by one consumer.
See the pgmq.read function for more details.

For consumer retries mechanism (e.g. mark a message as failed after a certain number of retries) can be implemented by using the read_ct field in the Message object.

Important

vt is the visibility timeout in seconds.
When a message is read from the queue, it will be invisible to other consumers for the duration of the vt.

Usage:

from pgmq_sqlalchemy.schema import Message

msg:Message = await pgmq_client.read_async('my_queue')
print(msg.msg_id)
print(msg.message)
print(msg.read_ct) # read count, how many times the message has been read

Example with vt:

# assert `read_vt_demo` is empty
await pgmq_client.send_async('read_vt_demo', {'key': 'value', 'key2': 'value2'})
msg = await pgmq_client.read_async('read_vt_demo', vt=10)
assert msg is not None

# try to read immediately
msg = await pgmq_client.read_async('read_vt_demo')
assert msg is None # will return None because the message is still invisible

# try to read after 5 seconds
await asyncio.sleep(5)
msg = await pgmq_client.read_async('read_vt_demo')
assert msg is None # still invisible after 5 seconds

 # try to read after 11 seconds
await asyncio.sleep(6)
msg = await pgmq_client.read_async('read_vt_demo')
assert msg is not None # the message is visible after 10 seconds
read_batch(queue_name: str, batch_size: int = 1, vt: int | None = None, *, session: Session | AsyncSession | None = None, commit: bool = True) List[Message] | None
Read a batch of messages from the queue.
Usage:
Returns:

List of Message or None if the queue is empty.

from pgmq_sqlalchemy.schema import Message

msgs:List[Message] = pgmq_client.read_batch('my_queue', batch_size=10)
# with vt
msgs:List[Message] = pgmq_client.read_batch('my_queue', batch_size=10, vt=10)
async read_batch_async(queue_name: str, batch_size: int = 1, vt: int | None = None, *, session: Session | AsyncSession | None = None, commit: bool = True) List[Message] | None
Read a batch of messages from the queue.
Usage:
Returns:

List of Message or None if the queue is empty.

from pgmq_sqlalchemy.schema import Message

msgs:List[Message] = await pgmq_client.read_batch_async('my_queue', batch_size=10)
# with vt
msgs:List[Message] = await pgmq_client.read_batch_async('my_queue', batch_size=10, vt=10)
read_with_poll(queue_name: str, vt: int | None = None, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, *, session: Session | AsyncSession | None = None, commit: bool = True) List[Message] | None
Read messages from a queue with long-polling.

When the queue is empty, the function block at most max_poll_seconds seconds.
During the polling, the function will check the queue every poll_interval_ms milliseconds, until the queue has qty messages.
Parameters:
  • queue_name (str) – The name of the queue.

  • vt (Optional[int]) – The visibility timeout in seconds.

  • qty (int) – The number of messages to read.

  • max_poll_seconds (int) – The maximum number of seconds to poll.

  • poll_interval_ms (int) – The interval in milliseconds to poll.

Returns:

List of Message or None if the queue is empty.

Usage:

msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=6)

# the following code will block for 5 seconds
msgs = pgmq_client.read_with_poll('my_queue', qty=1, max_poll_seconds=5, poll_interval_ms=100)
assert msgs is None

# try read_with_poll again
# the following code will only block for 1 second
msgs = pgmq_client.read_with_poll('my_queue', qty=1, max_poll_seconds=5, poll_interval_ms=100)
assert msgs is not None

Another example:

msg = {'key': 'value'}
msg_ids = pgmq_client.send_batch('my_queue', [msg, msg, msg, msg], delay=3)

# the following code will block for 3 seconds
msgs = pgmq_client.read_with_poll('my_queue', qty=3, max_poll_seconds=5, poll_interval_ms=100)
assert len(msgs) == 3 # will read at most 3 messages (qty=3)
async read_with_poll_async(queue_name: str, vt: int | None = None, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, *, session: Session | AsyncSession | None = None, commit: bool = True) List[Message] | None
Read messages from a queue with long-polling.

When the queue is empty, the function block at most max_poll_seconds seconds.
During the polling, the function will check the queue every poll_interval_ms milliseconds, until the queue has qty messages.
Parameters:
  • queue_name (str) – The name of the queue.

  • vt (Optional[int]) – The visibility timeout in seconds.

  • qty (int) – The number of messages to read.

  • max_poll_seconds (int) – The maximum number of seconds to poll.

  • poll_interval_ms (int) – The interval in milliseconds to poll.

Returns:

List of Message or None if the queue is empty.

Usage:

msg_id = await pgmq_client.send_async('my_queue', {'key': 'value'}, delay=6)

# the following code will block for 5 seconds
msgs = await pgmq_client.read_with_poll_async('my_queue', qty=1, max_poll_seconds=5, poll_interval_ms=100)
assert msgs is None

# try read_with_poll again
# the following code will only block for 1 second
msgs = await pgmq_client.read_with_poll_async('my_queue', qty=1, max_poll_seconds=5, poll_interval_ms=100)
assert msgs is not None

Another example:

msg = {'key': 'value'}
msg_ids = await pgmq_client.send_batch_async('my_queue', [msg, msg, msg, msg], delay=3)

# the following code will block for 3 seconds
msgs = await pgmq_client.read_with_poll_async('my_queue', qty=3, max_poll_seconds=5, poll_interval_ms=100)
assert len(msgs) == 3 # will read at most 3 messages (qty=3)
set_vt(queue_name: str, msg_id: int, vt: int, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None

Set the visibility timeout for a message.

Parameters:
  • queue_name (str) – The name of the queue.

  • msg_id (int) – The message id.

  • vt (int) – The visibility timeout in seconds.

Returns:

Message or None if the message does not exist.

Usage:

msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=10)
msg = pgmq_client.read('my_queue')
assert msg is not None
msg = pgmq_client.set_vt('my_queue', msg.msg_id, 10)
assert msg is not None

Tip

read() and set_vt() can be used together to implement exponential backoff mechanism.
For example:
from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message

def _exp_backoff_retry(msg: Message)->int:
    # exponential backoff retry
    if msg.read_ct < 5:
        return 2 ** msg.read_ct
    return 2 ** 5

def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
    msg = pgmq_client.read(
        queue_name=queue_name,
        vt=1000, # set vt to 1000 seconds temporarily
    )
    if msg is None:
        return

    # set exponential backoff retry
    pgmq_client.set_vt(
        queue_name=query_name,
        msg_id=msg.msg_id,
        vt=_exp_backoff_retry(msg)
    )
async set_vt_async(queue_name: str, msg_id: int, vt: int, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None

Set the visibility timeout for a message.

Parameters:
  • queue_name (str) – The name of the queue.

  • msg_id (int) – The message id.

  • vt (int) – The visibility timeout in seconds.

Returns:

Message or None if the message does not exist.

Usage:

msg_id = await pgmq_client.send_async('my_queue', {'key': 'value'}, delay=10)
msg = await pgmq_client.read_async('my_queue')
assert msg is not None
msg = await pgmq_client.set_vt_async('my_queue', msg.msg_id, 10)
assert msg is not None

Tip

read() and set_vt() can be used together to implement exponential backoff mechanism.
For example:
from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message

def _exp_backoff_retry(msg: Message)->int:
    # exponential backoff retry
    if msg.read_ct < 5:
        return 2 ** msg.read_ct
    return 2 ** 5

def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
    msg = await pgmq_client.read_async(
        queue_name=queue_name,
        vt=1000, # set vt to 1000 seconds temporarily
    )
    if msg is None:
        return

    # set exponential backoff retry
    await pgmq_client.set_vt_async(
        queue_name=query_name,
        msg_id=msg.msg_id,
        vt=_exp_backoff_retry(msg)
    )
pop(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None

Reads a single message from a queue and deletes it upon read.

msg = pgmq_client.pop('my_queue')
print(msg.msg_id)
print(msg.message)
async pop_async(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) Message | None

Reads a single message from a queue and deletes it upon read.

msg = await pgmq_client.pop_async('my_queue')
print(msg.msg_id)
print(msg.message)
delete(queue_name: str, msg_id: int, *, session: Session | AsyncSession | None = None, commit: bool = True) bool

Delete a message from the queue.

  • Raises an error if the queue_name does not exist.

  • Returns True if the message is deleted successfully.

  • If the message does not exist, returns False.

msg_id = pgmq_client.send('my_queue', {'key': 'value'})
assert pgmq_client.delete('my_queue', msg_id)
assert not pgmq_client.delete('my_queue', msg_id)
async delete_async(queue_name: str, msg_id: int, *, session: Session | AsyncSession | None = None, commit: bool = True) bool

Delete a message from the queue.

  • Raises an error if the queue_name does not exist.

  • Returns True if the message is deleted successfully.

  • If the message does not exist, returns False.

msg_id = await pgmq_client.send_async('my_queue', {'key': 'value'})
assert await pgmq_client.delete_async('my_queue', msg_id)
assert not await pgmq_client.delete_async('my_queue', msg_id)
delete_batch(queue_name: str, msg_ids: List[int], *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]

Delete a batch of messages from the queue.

Note

Instead of return bool like delete(),
delete_batch() will return a list of msg_id that are successfully deleted.
msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}])
assert pgmq_client.delete_batch('my_queue', msg_ids) == msg_ids
async delete_batch_async(queue_name: str, msg_ids: List[int], *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]

Delete a batch of messages from the queue.

Note

Instead of return bool like delete(),
delete_batch() will return a list of msg_id that are successfully deleted.
msg_ids = await pgmq_client.send_batch_async('my_queue', [{'key': 'value'}, {'key': 'value'}])
assert await pgmq_client.delete_batch_async('my_queue', msg_ids) == msg_ids
archive(queue_name: str, msg_id: int, *, session: Session | AsyncSession | None = None, commit: bool = True) bool

Archive a message from a queue.

  • Message will be deleted from the queue and moved to the archive table.
    • Will be deleted from pgmq.q_<queue_name> and be inserted into the pgmq.a_<queue_name> table.

  • raises an error if the queue_name does not exist.

  • returns True if the message is archived successfully.

msg_id = pgmq_client.send('my_queue', {'key': 'value'})
assert pgmq_client.archive('my_queue', msg_id)
# since the message is archived, queue will be empty
assert pgmq_client.read('my_queue') is None
async archive_async(queue_name: str, msg_id: int, *, session: Session | AsyncSession | None = None, commit: bool = True) bool

Archive a message from a queue.

  • Message will be deleted from the queue and moved to the archive table.
    • Will be deleted from pgmq.q_<queue_name> and be inserted into the pgmq.a_<queue_name> table.

  • raises an error if the queue_name does not exist.

  • returns True if the message is archived successfully.

msg_id = await pgmq_client.send_async('my_queue', {'key': 'value'})
assert await pgmq_client.archive_async('my_queue', msg_id)
# since the message is archived, queue will be empty
assert await pgmq_client.read_async('my_queue') is None
archive_batch(queue_name: str, msg_ids: List[int], *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]

Archive multiple messages from a queue.

  • Messages will be deleted from the queue and moved to the archive table.

  • Returns a list of msg_id that are successfully archived.

msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}])
assert pgmq_client.archive_batch('my_queue', msg_ids) == msg_ids
assert pgmq_client.read('my_queue') is None
async archive_batch_async(queue_name: str, msg_ids: List[int], *, session: Session | AsyncSession | None = None, commit: bool = True) List[int]

Archive multiple messages from a queue.

  • Messages will be deleted from the queue and moved to the archive table.

  • Returns a list of msg_id that are successfully archived.

msg_ids = await pgmq_client.send_batch_async('my_queue', [{'key': 'value'}, {'key': 'value'}])
assert await pgmq_client.archive_batch_async('my_queue', msg_ids) == msg_ids
assert await pgmq_client.read_async('my_queue') is None
purge(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) int
  • Delete all messages from a queue, return the number of messages deleted.

  • Archive tables will not be affected.

msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}])
assert pgmq_client.purge('my_queue') == 2
assert pgmq_client.read('my_queue') is None
async purge_async(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) int
  • Delete all messages from a queue, return the number of messages deleted.

  • Archive tables will not be affected.

msg_ids = await pgmq_client.send_batch_async('my_queue', [{'key': 'value'}, {'key': 'value'}])
assert await pgmq_client.purge_async('my_queue') == 2
assert await pgmq_client.read_async('my_queue') is None
metrics(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) QueueMetrics | None

Get metrics for a queue.

Returns:

QueueMetrics or None if the queue does not exist.

Usage:

from pgmq_sqlalchemy.schema import QueueMetrics

metrics:QueueMetrics = pgmq_client.metrics('my_queue')
print(metrics.queue_name)
print(metrics.queue_length)
print(metrics.queue_length)
async metrics_async(queue_name: str, *, session: Session | AsyncSession | None = None, commit: bool = True) QueueMetrics | None

Get metrics for a queue.

Returns:

QueueMetrics or None if the queue does not exist.

Usage:

from pgmq_sqlalchemy.schema import QueueMetrics

metrics:QueueMetrics = await pgmq_client.metrics_async('my_queue')
print(metrics.queue_name)
print(metrics.queue_length)
print(metrics.queue_length)
metrics_all(*, session: Session | AsyncSession | None = None, commit: bool = True) List[QueueMetrics] | None

Get metrics for all queues.

Returns:

List of QueueMetrics or None if there are no queues.

Usage:

from pgmq_sqlalchemy.schema import QueueMetrics

metrics:List[QueueMetrics] = pgmq_client.metrics_all()
for m in metrics:
    print(m.queue_name)
    print(m.queue_length)
    print(m.queue_length)

Warning

You should use a distributed lock to avoid race conditions when calling metrics_all() in concurrent drop_queue() scenarios.

Since the default PostgreSQL isolation level is READ COMMITTED, the queue metrics to be fetched may not exist if there are concurrent drop_queue() operations.
Check the pgmq.metrics_all function for more details.
async metrics_all_async(*, session: Session | AsyncSession | None = None, commit: bool = True) List[QueueMetrics] | None

Get metrics for all queues.

Returns:

List of QueueMetrics or None if there are no queues.

Usage:

from pgmq_sqlalchemy.schema import QueueMetrics

metrics:List[QueueMetrics] = await pgmq_client.metrics_all_async()
for m in metrics:
    print(m.queue_name)
    print(m.queue_length)
    print(m.queue_length)

Warning

You should use a distributed lock to avoid race conditions when calling metrics_all() in concurrent drop_queue() scenarios.

Since the default PostgreSQL isolation level is READ COMMITTED, the queue metrics to be fetched may not exist if there are concurrent drop_queue() operations.
Check the pgmq.metrics_all function for more details.

PGMQOperation (op)

The PGMQOperation class (aliased as op) provides static methods for transaction-friendly operations. All methods require a session to be passed in, giving you full control over transaction boundaries. This is useful when you need to combine PGMQ operations with your existing business logic within the same transaction.

class pgmq_sqlalchemy.operation.PGMQOperation

Static operations for PGMQ that accept user-provided sessions.

All methods are static and require a session to be passed in. Users are responsible for session management and transaction handling.

static check_pgmq_ext(*, session: Session, commit: bool = True) None

Check if pgmq extension exists and create it if not.

Parameters:
  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

async static check_pgmq_ext_async(*, session: AsyncSession, commit: bool = True) None

Check if pgmq extension exists and create it if not (async).

Parameters:
  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

static check_pg_partman_ext(*, session: Session, commit: bool = True) None

Check if pg_partman extension exists and create it if not.

Parameters:
  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

async static check_pg_partman_ext_async(*, session: AsyncSession, commit: bool = True) None

Check if pg_partman extension exists and create it if not (async).

Parameters:
  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

static create_queue(queue_name: str, unlogged: bool = False, *, session: Session, commit: bool = True) None

Create a new queue.

Parameters:
  • queue_name – The name of the queue.

  • unlogged – If True, creates an unlogged table.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

async static create_queue_async(queue_name: str, unlogged: bool = False, *, session: AsyncSession, commit: bool = True) None

Create a new queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • unlogged – If True, creates an unlogged table.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

static create_partitioned_queue(queue_name: str, partition_interval: str, retention_interval: str, *, session: Session, commit: bool = True) None

Create a new partitioned queue.

Parameters:
  • queue_name – The name of the queue.

  • partition_interval – Partition interval as string.

  • retention_interval – Retention interval as string.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

async static create_partitioned_queue_async(queue_name: str, partition_interval: str, retention_interval: str, *, session: AsyncSession, commit: bool = True) None

Create a new partitioned queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • partition_interval – Partition interval as string.

  • retention_interval – Retention interval as string.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

static validate_queue_name(queue_name: str, *, session: Session, commit: bool = True) None

Validate the length of a queue name.

Parameters:
  • queue_name – The name of the queue.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

async static validate_queue_name_async(queue_name: str, *, session: AsyncSession, commit: bool = True) None

Validate the length of a queue name asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

static drop_queue(queue: str, partitioned: bool = False, *, session: Session, commit: bool = True) bool

Drop a queue.

Parameters:
  • queue – The name of the queue.

  • partitioned – Whether the queue is partitioned.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

True if the queue was dropped successfully.

async static drop_queue_async(queue: str, partitioned: bool = False, *, session: AsyncSession, commit: bool = True) bool

Drop a queue asynchronously.

Parameters:
  • queue – The name of the queue.

  • partitioned – Whether the queue is partitioned.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

True if the queue was dropped successfully.

static list_queues(*, session: Session, commit: bool = True) List[str]

List all queues.

Parameters:
  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of queue names.

async static list_queues_async(*, session: AsyncSession, commit: bool = True) List[str]

List all queues asynchronously.

Parameters:
  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of queue names.

static send(queue_name: str, message: dict, delay: int = 0, *, session: Session, commit: bool = True) int

Send a message to a queue.

Parameters:
  • queue_name – The name of the queue.

  • message – The message as a dictionary.

  • delay – Delay in seconds before the message becomes visible.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

The message ID.

async static send_async(queue_name: str, message: dict, delay: int = 0, *, session: AsyncSession, commit: bool = True) int

Send a message to a queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • message – The message as a dictionary.

  • delay – Delay in seconds before the message becomes visible.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

The message ID.

static send_batch(queue_name: str, messages: List[dict], delay: int = 0, *, session: Session, commit: bool = True) List[int]

Send a batch of messages to a queue.

Parameters:
  • queue_name – The name of the queue.

  • messages – The messages as a list of dictionaries.

  • delay – Delay in seconds before the messages become visible.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of message IDs.

async static send_batch_async(queue_name: str, messages: List[dict], delay: int = 0, *, session: AsyncSession, commit: bool = True) List[int]

Send a batch of messages to a queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • messages – The messages as a list of dictionaries.

  • delay – Delay in seconds before the messages become visible.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of message IDs.

static read(queue_name: str, vt: int, *, session: Session, commit: bool = True) Message | None

Read a message from the queue.

Parameters:
  • queue_name – The name of the queue.

  • vt – Visibility timeout in seconds.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

Message or None if the queue is empty.

async static read_async(queue_name: str, vt: int, *, session: AsyncSession, commit: bool = True) Message | None

Read a message from the queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • vt – Visibility timeout in seconds.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

Message or None if the queue is empty.

static read_batch(queue_name: str, vt: int, batch_size: int = 1, *, session: Session, commit: bool = True) List[Message] | None

Read a batch of messages from the queue.

Parameters:
  • queue_name – The name of the queue.

  • vt – Visibility timeout in seconds.

  • batch_size – Number of messages to read.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of messages or None if the queue is empty.

async static read_batch_async(queue_name: str, vt: int, batch_size: int = 1, *, session: AsyncSession, commit: bool = True) List[Message] | None

Read a batch of messages from the queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • vt – Visibility timeout in seconds.

  • batch_size – Number of messages to read.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of messages or None if the queue is empty.

static read_with_poll(queue_name: str, vt: int, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, *, session: Session, commit: bool = True) List[Message] | None

Read messages from a queue with polling.

Parameters:
  • queue_name – The name of the queue.

  • vt – Visibility timeout in seconds.

  • qty – Number of messages to read.

  • max_poll_seconds – Maximum number of seconds to poll.

  • poll_interval_ms – Interval in milliseconds to poll.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of messages or None if the queue is empty.

async static read_with_poll_async(queue_name: str, vt: int, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, *, session: AsyncSession, commit: bool = True) List[Message] | None

Read messages from a queue with polling asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • vt – Visibility timeout in seconds.

  • qty – Number of messages to read.

  • max_poll_seconds – Maximum number of seconds to poll.

  • poll_interval_ms – Interval in milliseconds to poll.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of messages or None if the queue is empty.

static set_vt(queue_name: str, msg_id: int, vt: int, *, session: Session, commit: bool = True) Message | None

Set the visibility timeout for a message.

Parameters:
  • queue_name – The name of the queue.

  • msg_id – The message ID.

  • vt – Visibility timeout in seconds.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

Message or None if the message does not exist.

async static set_vt_async(queue_name: str, msg_id: int, vt: int, *, session: AsyncSession, commit: bool = True) Message | None

Set the visibility timeout for a message asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • msg_id – The message ID.

  • vt – Visibility timeout in seconds.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

Message or None if the message does not exist.

static pop(queue_name: str, *, session: Session, commit: bool = True) Message | None

Read and delete a message from the queue.

Parameters:
  • queue_name – The name of the queue.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

Message or None if the queue is empty.

async static pop_async(queue_name: str, *, session: AsyncSession, commit: bool = True) Message | None

Read and delete a message from the queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

Message or None if the queue is empty.

static delete(queue_name: str, msg_id: int, *, session: Session, commit: bool = True) bool

Delete a message from the queue.

Parameters:
  • queue_name – The name of the queue.

  • msg_id – The message ID.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

True if the message was deleted successfully.

async static delete_async(queue_name: str, msg_id: int, *, session: AsyncSession, commit: bool = True) bool

Delete a message from the queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • msg_id – The message ID.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

True if the message was deleted successfully.

static delete_batch(queue_name: str, msg_ids: List[int], *, session: Session, commit: bool = True) List[int]

Delete a batch of messages from the queue.

Parameters:
  • queue_name – The name of the queue.

  • msg_ids – List of message IDs.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of message IDs that were successfully deleted.

async static delete_batch_async(queue_name: str, msg_ids: List[int], *, session: AsyncSession, commit: bool = True) List[int]

Delete a batch of messages from the queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • msg_ids – List of message IDs.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of message IDs that were successfully deleted.

static archive(queue_name: str, msg_id: int, *, session: Session, commit: bool = True) bool

Archive a message from a queue.

Parameters:
  • queue_name – The name of the queue.

  • msg_id – The message ID.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

True if the message was archived successfully.

async static archive_async(queue_name: str, msg_id: int, *, session: AsyncSession, commit: bool = True) bool

Archive a message from a queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • msg_id – The message ID.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

True if the message was archived successfully.

static archive_batch(queue_name: str, msg_ids: List[int], *, session: Session, commit: bool = True) List[int]

Archive a batch of messages from the queue.

Parameters:
  • queue_name – The name of the queue.

  • msg_ids – List of message IDs.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of message IDs that were successfully archived.

async static archive_batch_async(queue_name: str, msg_ids: List[int], *, session: AsyncSession, commit: bool = True) List[int]

Archive a batch of messages from the queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • msg_ids – List of message IDs.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of message IDs that were successfully archived.

static purge(queue_name: str, *, session: Session, commit: bool = True) int

Purge all messages from a queue.

Parameters:
  • queue_name – The name of the queue.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

Number of messages purged.

async static purge_async(queue_name: str, *, session: AsyncSession, commit: bool = True) int

Purge all messages from a queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

Number of messages purged.

static metrics(queue_name: str, *, session: Session, commit: bool = True) QueueMetrics | None

Get metrics for a queue.

Parameters:
  • queue_name – The name of the queue.

  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

QueueMetrics or None if the queue does not exist.

async static metrics_async(queue_name: str, *, session: AsyncSession, commit: bool = True) QueueMetrics | None

Get metrics for a queue asynchronously.

Parameters:
  • queue_name – The name of the queue.

  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

QueueMetrics or None if the queue does not exist.

static metrics_all(*, session: Session, commit: bool = True) List[QueueMetrics] | None

Get metrics for all queues.

Parameters:
  • session – SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of QueueMetrics or None if no queues exist.

async static metrics_all_async(*, session: AsyncSession, commit: bool = True) List[QueueMetrics] | None

Get metrics for all queues asynchronously.

Parameters:
  • session – Async SQLAlchemy session.

  • commit – Whether to commit the transaction.

Returns:

List of QueueMetrics or None if no queues exist.

Schema Classes

class pgmq_sqlalchemy.schema.Message(msg_id: int, read_ct: int, enqueued_at: datetime, vt: datetime, message: dict)
enqueued_at: datetime
message: dict
msg_id: int
read_ct: int
vt: datetime
class pgmq_sqlalchemy.schema.QueueMetrics(queue_name: str, queue_length: int, newest_msg_age_sec: int | None, oldest_msg_age_sec: int | None, total_messages: int)
newest_msg_age_sec: int | None
oldest_msg_age_sec: int | None
queue_length: int
queue_name: str
total_messages: int