How to implement semaphore thread binding in Perl?

My Perl script should run multiple threads at the same time ...

use threads ('yield', 'exit' => 'threads_only'); use threads::shared; use strict; use warnings; no warnings 'threads'; use LWP::UserAgent; use HTTP::Request; use HTTP::Async; use ... 

... and such streams should get some information from the network, so HTTP::Async .

 my $request = HTTP::Request->new; $request->protocol('HTTP/1.1'); $request->method('GET'); $request->header('User-Agent' => '...'); my $async = HTTP::Async->new( slots => 100, timeout => REQUEST_TIMEOUT, max_request_time => REQUEST_TIMEOUT ); 

But some threads should only have access to the network when another thread (s) says it.

 my $start = [Time::HiRes::gettimeofday()]; my @threads = (); foreach ... { $thread = threads->create( sub { local $SIG{KILL} = sub { threads->exit }; my $url = shift; if ($url ... ) { # wait for "go" signal from other threads } my ($response, $data); $request->url($url); $data = ''; $async->add($request); while ($response = $async->wait_for_next_response) { threads->yield(); $data .= $response->as_string; } if ($data ... ) { # send "go" signal to waiting threads } } }, $_); if (defined $thread) { $thread->detach; push (@threads, $thread); } } 

There may be one or more wait threads for the go signal, and there may be one or more of those that go, the signal may send . In the beginning, the semaphore status is β€œ wait ”, and as soon as it turns into β€œ go ”, it will remain so.

Finally, the application checks the maximum run time. If the threads run for too long, a self-limiting signal is transmitted.

 my $running; do { $running = 0; foreach my $thread (@threads) { $running++ if $thread->is_running(); } threads->yield(); } until (($running == 0) || (Time::HiRes::tv_interval($start) > MAX_RUN_TIME)); $running = 0; foreach my $thread (@threads) { if ($thread->is_running()) { $thread->kill('KILL'); $running++; } } threads->yield(); 

Now. My questions:

  • How can I most effectively encode a pending "semaphore" in a script (see comments in the script above). Should I just use only a shared variable with some dummy sleep loop?

  • Do I need to add the end of the sleep cycle at the end of the application to give time to threads to self-destruct?

+6
source share
2 answers

You can look at Thread :: Queue to do this work. You can configure a queue that will handle the signaling between threads waiting for a go signal and threads sending a go signal. Here is a quick layout that I have not tested:

 ... use Thread::Queue; ... # In main body my $q = Thread::Queue->new(); ... $thread = threads->create( sub { local $SIG{KILL} = sub { threads->exit }; my $url = shift; if ($url ... ) { # wait for "go" signal from other threads my $mesg = $q->dequeue(); # you could put in some termination code if the $mesg isn't 'go' if ($mesg ne 'go') { ... } } ... if ($data ... ) { # send "go" signal to waiting threads $q->enqueue('go'); } } }, $_); ... 

Threads that must wait for a go signal will wait for the dequeue method until something enters the queue. As soon as a message enters the queue, one thread and only one thread will capture the message and process it.

If you want to stop threads so that they do not start, you can insert a stop message in the head of the queue.

 $q->insert(0, 'stop') foreach (@threads); 

There are examples in Thread :: Queue and CPAN Distribution threads that show this in more detail.

In answer to your second question, the answer, unfortunately, depends. When you keep shutting down your threads, what kind of cleanup is required for a clean shutdown? What is the worst case scenario that can occur if a carpet breaks from a thread? You would like to plan at any time for the cleaning to take place. Another option you could do is to wait on each thread for it to actually complete.

The reason for my comment asking if it is possible to remove the detach call is that this method allows the main thread to exit and does not care about what happens to any child threads. Instead, if you remove this call and add:

 $_->join() foreach threads->list(); 

to the end of your main block, this will require the main application to wait for each thread to complete.

If you leave the detach method in place, you will need to sleep at the end of your code if you need your threads to do any cleanup. When you call detach on a thread, what you tell Perl is that you don't care what the thread does when your main thread terminates. If the main thread exits and there are threads that are still running that have been disconnected, the program will exit without warning. However, if you do not need to be cleaned up and you still call detach , feel free to leave whenever you want.

+3
source

Try something like this ....

 #!/usr/bin/perl use threads; use threads::shared; $|=1; my ($global):shared; my (@threads); push(@threads, threads->new(\&mySub,1)); push(@threads, threads->new(\&mySub,2)); push(@threads, threads->new(\&mySub,3)); $i = 0; foreach my $myThread(@threads) { my @ReturnData = $myTread->join ; print "Thread $i returned: @ReturnData\n"; $i++; } sub mySub { my ($threadID) = @_; for(0..1000) { $global++; print "Thread ID: $threadID >> $_ >> GLB: $global\n"; sleep(1); } return( $id ); } 
-1
source

Source: https://habr.com/ru/post/915126/


All Articles