I started playing with RabbitMQ the RPC example code provided in AMKP gem doc, trying to write very simple code that executes a synchronous remote call:
require "amqp" module RPC class Base include EM::Deferrable def rabbit(rabbit_callback) rabbit_loop = Proc.new { AMQP.connect do |connection| AMQP::Channel.new(connection) do |channel| channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue| self.callback(&rabbit_callback) self.succeed(connection, channel, requests_queue) end # requests_queue end # AMQP.channel end # AMQP.connect Signal.trap("INT") { connection.close { EM.stop } } Signal.trap("TERM") { connection.close { EM.stop } } } if !EM.reactor_running? EM.run do rabbit_loop.call end else rabbit_loop.call end end end class Server < Base def run server_loop = Proc.new do |connection, channel, requests_queue| consumer = AMQP::Consumer.new(channel, requests_queue).consume consumer.on_delivery do |metadata, payload| puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..." channel.default_exchange.publish(Time.now.to_s, :routing_key => metadata.reply_to, :correlation_id => metadata.message_id, :mandatory => true) metadata.ack end end rabbit(server_loop) end end class Client < Base def sync_push(request) result = nil sync_request = Proc.new do |connection, channel, requests_queue| message_id = Kernel.rand(10101010).to_s response_queue = channel.queue("", :exclusive => true, :auto_delete => true) response_queue.subscribe do |headers, payload| if headers.correlation_id == message_id result = payload connection.close { EM.stop } end end EM.add_timer(0.1) do puts "[request] Sending a request...#{request} with id #{message_id}" channel.default_exchange.publish(request, :routing_key => requests_queue.name, :reply_to => response_queue.name, :message_id => message_id) end end rabbit(sync_request) result end end end
The idea is quite simple: I want the message queue to always be ready (this is handled by the rabbit method). Whenever a client wants to send a request, it starts by creating a temporary queue for the response along with the message identifier; he then publishes the request in the main message queue and waits for a response with the same message identifier in the time queue to know when the response for this particular request will be ready. I assume message_id is somehow redundant with the time queue (since the queue must also be unique).
Now I am running a dummy script using this client / server code
# server session >> server = RPC::Server.new =>
and
# client session >> client = RPC::Client.new => #<RPC::Client:0x007ffb6be6aed8> >> client.sync_push "test 1" Updating client properties [request] Sending a request...test 1 with id 3315740 => "2012-11-02 21:58:45 +0100" >> client.sync_push "test 2" AMQ::Client::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007ffb6b9c83d0 @payload="\x002\x00\n\x00\x00\x00\f\x00\x00\x00\x00", @channel=1>
There are two points that I really don’t understand:
- related to EventMachine: in
Client code, why do I need to call EM.add_timer if I want my message to actually be posted? And why use EM.next_tick not working? I understand that “everything” should be “ready” when the publication is called here. - AMQP related: why does my client crash due to a closed connection for the second request? It is assumed that a new EM / AMQP cycle will be created each time a new query is clicked.
Unfortunately, there is very little code available online regarding EM / AMQP, so any help would be greatly appreciated! Any comments regarding the effectiveness of this would also be greatly appreciated.