Multithreading with MQ

I have a problem with the MQSeries Perl module in a multi-threaded environment. Here is what I tried:

  • create two descriptors in another thread with $mqMgr = MQSeries::QueueManager->new(). I thought this would give me two different connections to MQ, but instead I got a return code of 2219 in the second call to MQOPEN(), which probably means I got the same disruptive connection to mq from the two call to new () methods .
  • declares only one $ mqMgr as a shared global variable . But I cannot assign a reference to the MQSeries :: QueueManager object on $mqMgr. The reason is that "Type of argument 1 in streams :: shared :: share must be one of [$ @%] (not a subheading)"
  • declares only one $mqMgras a global variable. Got the same code 2219.
  • Tried to get MQCNO_HANDLE_SHARE_NO_BLOCKin MQSeries::QueueManager->new()so that one connection could be common to the stream. But I can’t find a way to convey it.

My question is: with the Perl MQSeries module

  • How / can I get a separate connection to the MQ queue manager from different threads?
  • How can I share the connection with the MQ queue manager in different threads?

I looked around, but with little luck. Any information would be appreciated.

related question:


Update 1: add an example in which two local MQSeries :: QueueManager objects in two threads cause error code MQ 2219.

use threads;
use Thread::Queue;
use MQSeries;
use MQSeries::QueueManager;
use MQSeries::Queue;

# globals
our $jobQ = Thread::Queue->new();
our $resultQ = Thread::Queue->new();

# ----------------------------------------------------------------------------
# sub routines
# ----------------------------------------------------------------------------

sub worker {
    # fetch work from $jobQ and put result to $resultQ
    # ...
}

sub monitor {
    # fetch result from $resultQ and put it onto another MQ queue
    my $mqQMgr = MQSeries::QueueManager->new( ... );

    # different queue from the one in main
    # this would cause error with MQ code 2219
    my $mqQ = MQSeries::Queue->new( ... );

    while (defined(my $result = $resultQ->dequeue())) {
        # create an mq message and put it into $mqQ
        my $mqMsg = MQSeries::Message->new();
        $mqQ->put($mqMsg);
    }   
}

# main
unless (caller()) {
    # create connection to MQ
    my $mqQMgr = MQSeries::QueueManager->new( ... );
    my $mqQ = MQSeries::Queue->new( ... );

    # create worker and monitor thread
    my @workers;
    for (1 .. $nThreads) {
        push(@workers, threads->create('worker'));
    }
    my $monitor = threads->create('monitor');

    while (True) {
        my $mqMsg = MQSeries::Message->new ();

        my $retCode = $mqQ->get(
            Message => $mqMsg,
            GetMsgOptions => $someOption,
            Wait => $sometime
        );

        die("error") if ($retCode == 0);
        next if ($retCode == -1); # no message

        # not we have some job to do
        $jobQ->enqueue($mqMsg->Data);
    }
}
+4
source share
1 answer

, . , - , - , , ..

/ , , () .

, , :

, , . , , . , , .

/ threads::shared ( , ).

, :

  • "comms", , , , IPC, . Thread::Queue .

  • . ( require import - not use, ) . ( '' , , , ..)

  • , .

fork parallelism, , fork " ", .

Edit:

, , MQSeries:

  • BEGIN, MQSeries , use .

, , - , , , , , - " , " "BEGIN".

, , , ( , ):

require MQSeries; 
MQSeries->import;

- use - . . creates .

+2

Source: https://habr.com/ru/post/1612694/


All Articles