How best to keep the job queue clean from duplicate / duplicate jobs (using sidekiq and redis-semaphore)

I have a rails application that retrieves many emails from multiple IMAP accounts.

  • I use sidekiq for job processing.
  • I use sidetiq to schedule tasks.
  • I use redis-semaphore to ensure that duplicate jobs for the same user do not bump into each other.

2 questions:

  • 1: When a task falls into the "if s.lock" redis-semaphore , it pauses all previous tasks. I need the job to be canceled, and not in the queue.
  • 2: If an exception occurs during a job that causes a crash, sidekiq will put the job back into the queue for retrying . I need the job to be canceled, not in line. Putting "sidekiq_options: retry => false" in the code does not seem to change.

My code is:

class FetchMailsJobs include Sidekiq::Worker include Sidetiq::Schedulable tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) } def perform(last_occurrence, current_occurrence) users = User.all users.each do |user| if user.imap_accounts.exists? ImapJob.perform_async(user._id.to_s) end end end end class ImapJob include Sidekiq::Worker def perform(user_id) s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost") if s.lock user = User.where(_id: user_id).first emails = ImapMails.receive_mails(user) s.unlock end end end 
+6
source share
2 answers

1. Subclass Redis and overload blpop to accept -1 for non-blocking use of lpop .

redis-semaphore calls @redis.blpop in Redis::Semaphore#lock . Although you could @redis.lpop lock method to use @redis.lpop instead, a much simpler approach would be to pass a custom Redis instance to the semaphore.

Put the following in the lib your rails application and enter it in your config/initializers/sidekiq.rb (or do whatever your download may be for the next class).

 class NonBlockingRedis < Redis def blpop(key, timeout) if timeout == -1 result = lpop(key) return result if result.nil? return [key, result] else super(key, timeout) end end end 

Whenever you call Redis::Semaphore.new , pass the key :redis with a new instance of the NonBlockingRedis class.

Call s.lock with -1 as an argument to use lpop instead of blpop .

 s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost")) if s.lock -1 user = User.where(_id: user_id).first emails = ImapMails.receive_mails(user) s.unlock end 

2. Using sidekiq_options retry: false in the working class should work, see below for an example.

In your question, you did not indicate which worker you had problems with tasks ending in the retry queue. Because FetchMailsJobs completes the execution of ImapJob , an exception in the first case may cause it to be reordered by ImapJob .

With your semaphore lock, it would be nice to wrap your work in a begin rescue ensure block.

 class FetchMailsJobs include Sidekiq::Worker include Sidetiq::Schedulable sidekiq_options retry: false tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) } def perform(last_occurrence, current_occurrence) users = User.all users.each do |user| if user.imap_accounts.exists? ImapJob.perform_async(user._id.to_s) end end end end class ImapJob include Sidekiq::Worker sidekiq_options retry: false def perform(user_id) s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost")) if s.lock - 1 begin user = User.where(_id: user_id).first emails = ImapMails.receive_mails(user) rescue => e # ignore; do nothing ensure s.unlock end end end end 

See sidekiq for more details. Advanced options: Work .

+6
source

Is it not possible to intercept with redis-semaphore and use the sidekiq-unique-jobs gem ? This seems to be a well-built tool that does exactly what you are looking for.

+1
source

Source: https://habr.com/ru/post/944294/


All Articles