I am following the RabbitMQ Work Queues tutorial for Elixir ( Elizir Work Queues ), which works pretty well. In addition, now I am trying to launch several users and control their supervision.
This last part is a bit complicated. If I run the code below in two separate iex sessions, both will receive and process messages from RabbitMQ.
Customer (consumer)
defmodule MT.Client do
require Logger
@host Application.get_env(:mt, :host)
@username Application.get_env(:mt, :username)
@password Application.get_env(:mt, :password)
@channel Application.get_env(:mt, :channel)
def start_link do
MT.Client.connect
end
def connect do
{:ok, connection} = AMQP.Connection.open(host: @host, username: @username, password: @password)
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, @channel, durable: true)
AMQP.Basic.qos(channel, prefetch_count: 1)
AMQP.Basic.consume(channel, @channel)
Logger.info "[*] Waiting for messages"
MT.Client.loop(channel)
end
def loop(channel) do
receive do
{:basic_deliver, payload, meta} ->
Logger.info "[x] Received #{payload}"
payload
|> to_char_list
|> Enum.count(fn x -> x == ?. end)
|> Kernel.*(1000)
|> :timer.sleep
Logger.info "[x] Done."
AMQP.Basic.ack(channel, meta.delivery_tag)
MT.Client.loop(channel)
end
end
end
Supervisor
defmodule MT.Client.Supervisor do
use Supervisor
require Logger
@name MTClientSupervisor
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: @name)
end
def init(:ok) do
children = [
worker(MT.Client, [], restart: :transient, id: "MTClient01"),
worker(MT.Client, [], restart: :transient, id: "MTClient02"),
worker(MT.Client, [], restart: :transient, id: "MTClient03")
]
supervise(children, strategy: :one_for_one)
end
end
When launched in iex session:
iex -S mix
MT.Client.Supervisor.start_link
It is written below:
08:46:50.746 [info] [*] Waiting for messages
08:46:50.746 [info] [x] Received {"job":"TestMessage","data":{"message":"message........"}}
08:46:58.747 [info] [x] Done.
08:46:58.748 [info] [x] Received {"job":"TestMessage","data":{"message":"last........"}}
08:47:06.749 [info] [x] Done.
It is so clear that only in 1 consumer is active, which consumes messages sequentially.
Running below in 2 iex sessions:
MT.Client.start_link
I do not add logs here, but in this case I get 2 messages processing calls at the same time
, Agent/GenServer/Supervisor. - , MT.Client MT.Client.Supervisor , , ?
; - pid AMQP.Basic.consume(channel, @channel, pid), .