How to collect results from new streams created in real time in ruby ​​asynchronously

I would like to constantly check the table in the database to execute the commands. Some teams may take 4 minutes, and after 10 seconds.

Therefore, I would like to run them in threads. Thus, each record creates a new stream, and after creating the stream, the record is deleted.

Since the DB + search will create a stream in an endless loop, how do I get a response from a stream (the stream will issue a shell command and get a response code that I would like to read)?

I was thinking about creating two threads with an infinite loop: - first, to search for the database + create new threads - the second for ... somehow reading the results of the threads and the actions for each answer

Or maybe I should use fork or os to create a new process?

0
source share
1 answer

You can each thread push its results to the queue, then your main thread can read from the queue. Reading from the queue is the default locking operation, so if there are no results, your code will block and wait for reading.

http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html

Here is an example:

require 'thread'

jobs = Queue.new
results = Queue.new

thread_pool = []
pool_size = 5

(1..pool_size).each do |i|
  thread_pool << Thread.new do 
    loop do 
      job = jobs.shift #blocks waiting for a task
      break if job == "!NO-MORE-JOBS!"

      #Otherwise, do job...
      puts "#{i}...."
      sleep rand(1..5) #Simulate the time it takes to do a job
      results << "thread#{i} finished #{job}"  #Push some result from the job onto the Queue
      #Go back and get another task from the Queue
    end
  end
end


#All threads are now blocking waiting for a job...
puts 'db_stuff'
db_stuff = [
  'job1', 
  'job2', 
  'job3', 
  'job4', 
  'job5',
  'job6',
  'job7',
]

db_stuff.each do |job|
  jobs << job
end

#Threads are now attacking the Queue like hungry dogs.

pool_size.times do
  jobs << "!NO-MORE-JOBS!"
end

result_count = 0

loop do
  result = results.shift
  puts "result: #{result}"
  result_count +=1
  break if result_count == 7
end
+2
source

Source: https://habr.com/ru/post/1548141/


All Articles