Dbms_aq.dequeue_array, the first message is returned twice

Introduction

I encounter very strange behavior on my Oracle SQL Server (namely: Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production) when using the Oracle extended queue methods.

Problem

The error is that I am inserting X messages, but dequeue_array returns X + 1 messages, with the first message being duplicated (as seen from MessageId).

Play:

I was able to write a simple PoC to reproduce the error. This code is pretty straight forward, enqueue / dequeue stuff is standard Oracle AQ. The code performs the following steps two times (test runs):

  • Clear Queue Table
  • X Status Messages
  • Delete all messages using the dbms_aq.dequeue_array call
  • Check how many messages have been canceled.

When starting POC on a new connection, the first start is performed without errors, but each subsequent start fails. After that, when using the same connection every time you execute the script, it will not work in both test runs.

What I have tried so far:

  • Running this script over a "new" connection: only the first run fails
  • Any additional script execution on the same connection: all runs
  • When using / creating a new queue: only the first start is performed
  • When using "delete from <queue_table>" instead of dbms_aqadm.purge_queue_table (): everything is fine
  • When using the "normal" single delete: everyhing fine

Output:

I can neither explain this behavior nor find an error in my code. Please take a look at it; it should be directly executable in your favorite SQL client (tested with PL / SQL Developer).

If you need more information or have problems with PoC, just ask, I will check this topic regularly. I tried to make PoC as readable as possible, including a detailed conclusion about what was going on.

code:

declare C_QueueName constant varchar2(32767) := 'TEST_QUEUE'; C_QueueTable constant varchar2(32767) := 'TEST_Q_TABLE'; C_MsgCount constant pls_integer := 1; C_TestRuns constant pls_integer := 2; C_DequeueArraySize constant pls_integer := 10; /* * Create the queue and the queue table used for theses tests */ procedure CreateQueueIfMissing is L_Present pls_integer; begin dbms_output.put_line('START CreateQueueIfMissing'); execute immediate 'select count(*) from USER_OBJECTS where OBJECT_NAME = ''' || C_QueueName || ''' and OBJECT_TYPE = ''QUEUE''' into L_Present; if L_Present = 1 then dbms_output.put_line('Skipping queue creation, already present.'); dbms_output.put_line('END CreateQueueIfMissing'); return; end if; dbms_output.put_line(' Creating queue table ' || C_QueueTable); DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable ,storage_clause => 'LOGGING NOCACHE NOPARALLEL MONITORING' ,sort_list => 'priority,enq_time' ,multiple_consumers => false ,queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE' ,comment => 'Queue for messages'); dbms_output.put_line(' Creating queue ' || C_QueueName); DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName ,queue_table => C_QueueTable ,max_retries => 8640 ,retry_delay => 30 ,comment => 'Queue for messages'); dbms_output.put_line(' Starting queue ' || C_QueueName); DBMS_AQADM.START_QUEUE(queue_name => C_QueueName); dbms_output.put_line('END CreateQueueIfMissing'); end CreateQueueIfMissing; -- ================================================================================================ /* * This procedure is the root of all evil. * The error only occurs when using the purge_queue_tables procedure. * When using a normal "delete from <queue_table>" then everything is just fine. */ procedure CleanQueueTable is L_PurgeOptions dbms_aqadm.aq$_purge_options_t; L_Count pls_integer; begin dbms_output.put_line('START CleanQueueTable'); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table BEFORE purge: ' || L_Count); dbms_aqadm.purge_queue_table(queue_table => C_QueueTable ,purge_condition => null ,purge_options => L_PurgeOptions); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table AFTER purge: ' || L_Count); dbms_output.put_line('END CleanQueueTable'); end CleanQueueTable; -- ================================================================================================ /* * Enqueue the configured count of messages on the queue */ procedure EnqueueMessages is L_BodyId pls_integer; L_Msg sys.aq$_jms_bytes_message; L_MsgId raw(16); L_Count pls_integer; L_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T; L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T; begin dbms_output.put_line('START EnqueueMessages'); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table BEFORE enqueue: ' || L_Count); for i in 1 .. C_MsgCount loop dbms_output.put_line(' Construct #' || i); L_Msg := sys.aq$_jms_bytes_message.construct; -- set the JMS header L_Msg.set_type('JmsBytesMessage'); L_Msg.set_userid(1); L_Msg.set_appid('test'); L_Msg.set_groupid('cs'); L_Msg.set_groupseq(1); -- set JMS message content L_BodyId := L_Msg.clear_body(-1); L_Msg.write_bytes(L_BodyId, to_blob(utl_raw.cast_to_raw('<test>Lorem Ipsum</test>'))); L_Msg.flush(L_BodyId); L_Msg.clean(L_BodyId); dbms_output.put_line(' Enqueue #' || i); DBMS_AQ.ENQUEUE (queue_name => C_QueueName ,enqueue_options => L_EnqueueOptions ,message_properties => L_MessageProperties ,payload => L_Msg ,msgid => L_MsgId); end loop; execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table AFTER enqueue: ' || L_Count); dbms_output.put_line('END EnqueueMessages'); end EnqueueMessages; -- ================================================================================================ /* * Dequeues messages using dequeue_array from the configured queue. */ procedure DequeueMessages is L_DequeueOptions dbms_aq.dequeue_options_t; L_MsgPropArr dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t(); L_PayloadArr sys.aq$_jms_bytes_messages; L_MsgIdArr dbms_aq.msgid_array_t; L_MsgCnt pls_integer := 0; L_Count pls_integer; begin dbms_output.put_line('START DequeueMessages'); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table BEFORE dequeue: ' || L_Count); L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName ,dequeue_options => L_DequeueOptions ,array_size => C_DequeueArraySize ,message_properties_array => L_MsgPropArr ,payload_array => L_PayloadArr ,msgid_array => L_MsgIdArr); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table AFTER dequeue: ' || L_Count); dbms_output.put_line(' Expected: ' || C_MsgCount || ', Received: ' || L_MsgCnt); if C_MsgCount != L_MsgCnt then dbms_output.put_line(' *****************************************'); dbms_output.put_line(' TOO MANY ITEMS DEQUEUED?!?'); dbms_output.put_line(' *****************************************'); for i in 1 .. L_MsgCnt loop dbms_output.put_line(' #' || i || ' MsdId=' || L_MsgIdArr(i)); end loop; end if; dbms_output.put_line('END DequeueMessages'); end DequeueMessages; -- ================================================================================================ /* * This is the testcase */ procedure RunTestCase is begin CreateQueueIfMissing; for i in 1 .. C_TestRuns loop dbms_output.put_line(null); dbms_output.put_line('=========== START test run #' || i || '==========='); CleanQueueTable; EnqueueMessages; DequeueMessages; end loop; end; -- ================================================================================================ begin RunTestCase; end; 

Output Example:

 START CreateQueueIfMissing Skipping queue creation, already present. END CreateQueueIfMissing =========== START test run #1=========== START CleanQueueTable Messages in queue table BEFORE purge: 0 Messages in queue table AFTER purge: 0 END CleanQueueTable START EnqueueMessages Messages in queue table BEFORE enqueue: 0 Construct #1 Enqueue #1 Messages in queue table AFTER enqueue: 1 END EnqueueMessages START DequeueMessages Messages in queue table BEFORE dequeue: 1 Messages in queue table AFTER dequeue: 0 Expected: 1, Received: 1 END DequeueMessages =========== START test run #2=========== START CleanQueueTable Messages in queue table BEFORE purge: 0 Messages in queue table AFTER purge: 0 END CleanQueueTable START EnqueueMessages Messages in queue table BEFORE enqueue: 0 Construct #1 Enqueue #1 Messages in queue table AFTER enqueue: 1 END EnqueueMessages START DequeueMessages Messages in queue table BEFORE dequeue: 1 Messages in queue table AFTER dequeue: 0 Expected: 1, Received: 2 ***************************************** TOO MANY ITEMS DEQUEUED?!? ***************************************** #1 MsdId=2949A0FF2EE456A7E0540010E0467A30 #2 MsdId=2949A0FF2EE456A7E0540010E0467A30 END DequeueMessages 
+5
source share
1 answer

It looks like error 20659700 . There is a bit more information in document 2002148.1 .

You (or your database administrator) should raise a service request to confirm this and see if the patch is available for your platform.

+2
source

Source: https://habr.com/ru/post/1240607/


All Articles