Elixir Controls Multiple RabbitMQ Consumers

I am following the RabbitMQ Work Queues tutorial for Elixir ( Elizir Work Queues ), which works pretty well. In addition, now I am trying to launch several users and control their supervision.

This last part is a bit complicated. If I run the code below in two separate iex sessions, both will receive and process messages from RabbitMQ.

Customer (consumer)

defmodule MT.Client do
  require Logger

  @host Application.get_env(:mt, :host)
  @username Application.get_env(:mt, :username)
  @password Application.get_env(:mt, :password)
  @channel Application.get_env(:mt, :channel)

  def start_link do
    MT.Client.connect
  end

  def connect do
    {:ok, connection} = AMQP.Connection.open(host: @host, username: @username, password: @password)
    {:ok, channel} = AMQP.Channel.open(connection)

    AMQP.Queue.declare(channel, @channel, durable: true)
    AMQP.Basic.qos(channel, prefetch_count: 1)

    AMQP.Basic.consume(channel, @channel)

    Logger.info "[*] Waiting for messages"

    MT.Client.loop(channel)
  end

  def loop(channel) do
    receive do
      {:basic_deliver, payload, meta} ->
        Logger.info "[x] Received #{payload}"
        payload
        |> to_char_list
        |> Enum.count(fn x -> x == ?. end)
        |> Kernel.*(1000)
        |> :timer.sleep
        Logger.info "[x] Done."
        AMQP.Basic.ack(channel, meta.delivery_tag)

        MT.Client.loop(channel)
    end
  end
end

Supervisor

defmodule MT.Client.Supervisor do
  use Supervisor
  require Logger

  @name MTClientSupervisor

  def start_link do
    Supervisor.start_link(__MODULE__, :ok, name: @name)
  end

  def init(:ok) do
    children = [
      worker(MT.Client, [], restart: :transient, id: "MTClient01"),
      worker(MT.Client, [], restart: :transient, id: "MTClient02"),
      worker(MT.Client, [], restart: :transient, id: "MTClient03")
    ]
    supervise(children, strategy: :one_for_one)
  end
end

When launched in iex session:

iex -S mix
MT.Client.Supervisor.start_link

It is written below:

08:46:50.746 [info] [*] Waiting for messages
08:46:50.746 [info] [x] Received {"job":"TestMessage","data":{"message":"message........"}}
08:46:58.747 [info] [x] Done.
08:46:58.748 [info] [x] Received {"job":"TestMessage","data":{"message":"last........"}}
08:47:06.749 [info] [x] Done. 

It is so clear that only in 1 consumer is active, which consumes messages sequentially.

Running below in 2 iex sessions:

MT.Client.start_link

I do not add logs here, but in this case I get 2 messages processing calls at the same time

, Agent/GenServer/Supervisor. - , MT.Client MT.Client.Supervisor , , ?

; - pid AMQP.Basic.consume(channel, @channel, pid), .

+4

Source: https://habr.com/ru/post/1674058/


All Articles