Advanced Features

Oracle Advanced Queuing(AQ)

We made a very simple implement of Advanced Queue as it’s Oracledb’s exclusive feature. Here’s a rough draft with basic examples shows you how to use it .

Basic

Before running python codes below , you need to create queues in the database via console mode ,use tools such as SQL*Plus , run the following SQL:

Create a queue for basic test:

BEGIN
    DBMS_AQADM.CREATE_QUEUE_TABLE('MY_QUEUE_TABLE', 'RAW');
    DBMS_AQADM.CREATE_QUEUE('DEMO_RAW_QUEUE', 'MY_QUEUE_TABLE');
    DBMS_AQADM.START_QUEUE('DEMO_RAW_QUEUE');
END;

Create your own data type:

CREATE OR REPLACE TYPE udt_book AS OBJECT (
    Title   VARCHAR2(100),
    Authors VARCHAR2(100),
    Price   NUMBER(5,2)
);

Create a queue to contain modified data type:

BEGIN
    DBMS_AQADM.CREATE_QUEUE_TABLE('BOOK_QUEUE_TAB', 'UDT_BOOK');
    DBMS_AQADM.CREATE_QUEUE('DEMO_BOOK_QUEUE', 'BOOK_QUEUE_TAB');
    DBMS_AQADM.START_QUEUE('DEMO_BOOK_QUEUE');
END;

Then you can get access to AQ through the following code:

import cx_Oracle_async
import asyncio

async def features(pool):
    async with oracle_pool.acquire() as conn:
        # Basic put
        queue = await conn.queue("DEMO_RAW_QUEUE")
        PAYLOAD_DATA = [
            "The first message",
            "The second message",
            "The third message"
        ]
        for data in PAYLOAD_DATA:
            await queue.enqOne(conn.msgproperties(payload=data))
        await conn.commit()

        # Basic get
        queue = await conn.queue("DEMO_RAW_QUEUE")
        for _ in range(len(PAYLOAD_DATA)):
            msg = await queue.deqOne()
        print(msg.payload.decode(conn.encoding))
        await conn.commit()

async def main():
    dsn = cx_Oracle_async.makedsn(
        host = 'localhost',
        port = '1521',
        service_name='orcl'
    )
    async with cx_Oracle_async.create_pool(user = '' , password = '' , dsn = dsn) as pool:
        await features(pool)

asyncio.run(main())

You can define a queue contains your own type of data:

async def features(pool):
    async with pool.acquire() as conn:
        booksType = await conn.gettype("UDT_BOOK")
        book = booksType.newobject()
        book.TITLE = "Quick Brown Fox"
        book.AUTHORS = "The Dog"
        book.PRICE = 123

        # Put and get modified data
        queue = await conn.queue("DEMO_BOOK_QUEUE", booksType)
        await queue.enqOne(conn.msgproperties(payload=book))
        msg = await queue.deqOne()
        print(msg.payload.TITLE)
        await conn.commit()

Put many and get many:

async def features(pool):
    async with pool.acquire() as conn:
        # Put many
        messages = [
            "1",
            "2",
            "3",
            "4",
            "5",
            "6"
        ]
        queue = await conn.queue("DEMO_RAW_QUEUE")
        await queue.enqMany(conn.msgproperties(payload=m) for m in messages)
        await conn.commit()

        # Get many
        async for m in queue.deqMany(maxMessages=5):
            print(m.payload.decode(conn.encoding))
        await queue.deqOne() # clean
        await conn.commit()

As syntactic sugar ,there’re some equivalent replacements perticularly in this async library designed for convenient use. Queue.pack(m) is equal to Connection.msgproperties(payload=m) , Queue.unpack(m) is equal to m.payload.decode(conn.encoding) so that you don’t have to operate severl different type of objects.

Further more , Queue.unpack(m) will automatically detect whether the input is a single object or a iterable,

async def features(pool):
    async with pool.acquire() as conn:
        queue = await conn.queue("DEMO_RAW_QUEUE")
        async for _ in queue.deqMany():... # Clear queue
        message = "Hello World"

        # Queue.pack(m) is equal to Connection.msgproperties(payload=m)
        await queue.enqOne(queue.pack(message))
        await queue.enqOne(conn.msgproperties(payload=message))
        await conn.commit()

        # Queue.unpack(m) is equal to m.payload.decode(conn.encoding)
        ret1 = queue.unpack(await queue.deqOne())
        ret2 = (await queue.deqOne()).payload.decode(conn.encoding)
        await conn.commit()
        assert ret1 == ret2 == message

        # Queue.unpack(m) will do automatically treatment
        # depends on whether input a single object or a iterable.
        await queue.enqMany(queue.pack(m) for m in map(str , range(10)))
        await conn.commit()
        # This returns a list but not a single object.
        ret1 = queue.unpack(await queue.deqMany())
        await conn.commit()

        ret2 = []
        await queue.enqMany(queue.pack(m) for m in map(str , range(10)))
        await conn.commit()
        async for m in queue.deqMany():
            # This returns a single object since one input eachtime.
            ret2.append(queue.unpack(m))
        assert ret1 == ret2

It is noteworthy that since we were not implement this library asynchronous in a very basic level ,yet it’s just a wrapper of synchronous functions via threads , that makes it not gonna work if you are doing two different things in a single connection at a time. For example in the following situation the code will NOT work:

from cx_Oracle_async import makedsn , create_pool
import asyncio

async def coro_to_get_from_queue(conn , queue , oracle_pool):
    print(f"coroutine start fetching")
    ret = (await queue.deqOne()).payload.decode(conn.encoding)
    print(f"coroutine returned , {ret=}")
    await conn.commit()

async def main():
    loop = asyncio.get_running_loop()
    dsn = makedsn(
        host = 'localhost',
        port = '1521',
        service_name='orcl'
    )
    async with create_pool(user = '' , password = '' , dsn = dsn) as oracle_pool:
        async with oracle_pool.acquire() as conn:
            queue = await conn.queue("DEMO_RAW_QUEUE")
            loop.create_task(coro_to_get_from_queue(conn , queue , oracle_pool))

            await asyncio.sleep(1)

            data = 'Hello World'
            print(f"mainthread put some thing in queue ,{data=}")
            await queue.enqOne(conn.msgproperties(payload=data))
            await conn.commit()
            print(f"mainthread put some thing done")

    await asyncio.sleep(1)
    print('Process terminated.')

asyncio.run(main())

As we planned , there should be a fetching thread(coroutine) start fetcing , this action will block since the queue is empty , and will return until there’s something put into the queue. Then after one second sleep , the main thread will put ‘Hello World’ into AQ and that will trigger the blocked fetching thread , and then the whole program terminated.

However we will find the program blocking forever in real practice. That’s because since queue.deqOptions.wait equals to cx_Oracle.DEQ_WAIT_FOREVER thus while there’s nothing in the queue , the query will block AND this will take over the control of connection thread , which makes it impossible for the following code to put anything into the queue using the same thread, thus makes it a deadlock.

If you would like to achieve the same result , you should do that in ANOTHER connection thread. Simply modify the code as follow:

from cx_Oracle_async import makedsn , create_pool
import asyncio
from async_timeout import timeout

async def coro_to_get_from_queue(conn , queue , oracle_pool):
    try:
        async with timeout(2):
            print(f"coroutine start fetching")
            ret = (await queue.deqOne()).payload.decode(conn.encoding)
            print(f"coroutine returned , {ret=}")
            await conn.commit()
    except asyncio.TimeoutError:
        print('two seconds passed , timeout triggered.')
        async with oracle_pool.acquire() as conn2:
            queue2 = await conn2.queue("DEMO_RAW_QUEUE")
            data = 'Hello World'
            print(f"another connection put some thing in queue ,{data=}")
            await queue2.enqOne(conn2.msgproperties(payload=data))
            await conn2.commit()
            print(f"another connection put some thing done")

async def main():
    loop = asyncio.get_running_loop()
    dsn = makedsn(
        host = 'localhost',
        port = '1521',
        service_name='orcl'
    )
    async with create_pool(user = '' , password = '' , dsn = dsn) as oracle_pool:
        async with oracle_pool.acquire() as conn:
            queue = await conn.queue("DEMO_RAW_QUEUE")
            loop.create_task(coro_to_get_from_queue(conn , queue , oracle_pool))

            await asyncio.sleep(1)

            cursor = await conn.cursor()
            await cursor.execute(f"SELECT COUNT(*) FROM DEPT")
            fetch_result = await cursor.fetchall()
            print(f"main thread continue , {fetch_result=}")

    await asyncio.sleep(1)
    print('Process terminated.')

asyncio.run(main())

Special Explanation for queue.deqMany()

Queue.deqMany has a little bit complexity in usage , here’re some further instructions.

There’re two ways of calling deqMany , you can use it as a normal asynchronous call , OR you can use it as a asynchronous generator. For example:

from cx_Oracle_async import makedsn , create_pool
import asyncio
import random

async def main():
    loop = asyncio.get_running_loop()
    dsn = makedsn(
        host = 'localhost',
        port = '1521',
        service_name='orcl'
    )
    async with create_pool(user = '' , password = '' , dsn = dsn) as oracle_pool:
        async with oracle_pool.acquire() as conn:

            # Init and clear a queue
            queue = await conn.queue("DEMO_RAW_QUEUE")
            async for _ in queue.deqMany():...

            # Fill up
            await queue.enqMany(queue.pack(m) for m in map(str , range(10)))
            await conn.commit()

            # The First way , use it as a normal asynchronous call ,
            # This method use the original cx_Oracle.Queue.deqMany , so its
            # your choice if you're looking for efficiency concern. The
            # sub thread will block until all results returned.

            ret = await queue.deqMany(maxMessages = 10)
            await conn.commit()
            assert list(map(queue.unpack , ret)) == list(map(str , range(10)))

            # The second way , you can call deqMany as a asynchronous generator.
            # This is a self implemented method which yield queue.deqMany() with
            # queue.deqOptions = DEQ_NO_WAIT until it reaches the message limit or
            # there's nothing in the queue. The benifits is you will get immediate
            # response.

            await queue.enqMany(queue.pack(m) for m in map(str , range(10)))
            await conn.commit()

            ret = []
            async for m in queue.deqMany(maxMessages = 10):
                ret.append(queue.unpack(m))
            await conn.commit()
            assert ret == list(map(str , range(10)))

asyncio.run(main())

It is worth mentioning that , the two means act differently when there’s a empty queue.

If you are using the await mode , for example ret = await queue.deqMany() if do have something there in the queue , this method will quickly return , while if there’s nothing in the queue , the method will block until there’s something new come into the queue , this will sometime make it a deadlock in main threadloop under improper use. So do please make sure you’re clear about what you’re doing.

Of course you can change deqOptions into non-blocking mode like queue.deqOptions.wait = cx_Oracle_async.DEQ_NO_WAIT to aviod it.

On the other hand , If you are using the async with mode , it will never block your main thread , which means it will not be affected by Queue.deqOptions , no matter what setting Queue.deqOptions is , it will return immediately eventhough there’s nothing in the queue.

So taking into consideration that when argument maxMessages equals to -1 (default value), it means unlimit fetch untill the queue is empty (where if youre using await mode , the “unlimit” do have a soft upper bound of 65535 , and no limit with async for mode cause its assembled from multiple querys). It’s convenient to clear the whole queue with the following code:

queue = await ...
messages = list(map(str , range(random.randint(0,10000))))
await queue.enqMany(queue.pack(m) for m in messages)
await conn.commit()

# You are not clear about how large the queue size is
# (there's also chance it's empty)
# and want to take out all stuffs in it if its not empty.
ret = []
async for m in queue.deqMany():
    ret.append(queue.unpack(m))
print(ret)

# Do something keep on.
...