Skip to content
Advertisement

Rabbitmq remote call with Pika

I am new to rabbitmq and trying to figure out how I can make a client request a server with information about memory and CPU utilization with this tutorial (https://www.rabbitmq.com/tutorials/tutorial-six-python.html).

So the client requests for CPU and memory ( I believe I will need two queues) and the server respond with the values.

Is there anyway to simple create a client.py and server.py with this case using the Pika library in Python.

Advertisement

Answer

I would recommend you to follow the first RabbitMQ tutorials if you haven’t already. The RPC example builds on concepts covered on previous examples (direct queues, exclusive queues, acknowledgements, etc.).

The RPC solution proposed on the tutorial requires at least two queues, depending on how many clients you want to use:

  • One direct queue (rpc_queue), used to send requests from the client to the server.
  • One exclusive queue per client, used to receive responses.

The request/response cycle:

  • The client sends a message to the rpc_queue. Each message includes a reply_to property, with the name of the client exclusive queue the server should reply to, and a correlation_id property, which is just an unique id used to track the request.
  • The server waits for messages on the rpc_queue. When a message arrives, it prepares the response, adds the correlation_id to the new message, and sends it to the queue defined in the reply_to message property.
  • The client waits on its exclusive queue until it finds a message with the correlation_id that was originally generated.

Jumping straight to your problem, the first thing to do is to define the message format you’ll want to use on your responses. You can use JSON, msgpack or any other serialization library. For example, if using JSON, one message could look something like this:

{
    "cpu": 1.2,
    "memory": 0.3
} 

Then, on your server.py:

def on_request(channel, method, props, body):
    response = {'cpu': current_cpu_usage(),
                'memory': current_memory_usage()}
    properties = pika.BasicProperties(correlation_id=props.correlation_id)

    channel.basic_publish(exchange='',
                          routing_key=props.reply_to,
                          properties=properties,
                          body=json.dumps(response))
    channel.basic_ack(delivery_tag=method.delivery_tag)

# ...

And on your client.py:

class ResponseTimeout(Exception): pass

class Client:
    # similar constructor as `FibonacciRpcClient` from tutorial...

    def on_response(self, channel, method, props, body):
        if self.correlation_id == props.correlation_id:
            self.response = json.loads(body.decode())

    def call(self, timeout=2):
        self.response = None
        self.correlation_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.correlation_id),
                                   body='')

        start_time = time.time()
        while self.response is None:
            if (start_time + timeout) < time.time():
                raise ResponseTimeout()
            self.connection.process_data_events()
        return self.response

As you see, the code is pretty much the same as the original FibonacciRpcClient. The main differences are:

  • We use JSON as data format for our messages.
  • Our client call() method doesn’t require a body argument (there’s nothing to send to the server)
  • We take care of response timeouts (if the server is down, or if it doesn’t reply to our messages)

Still, there’re a lot of things to improve here:

  • No error handling: For example, if the client “forgets” to send a reply_to queue, our server is gonna crash, and will crash again on restart (the broken message will be requeued infinitely as long as it isn’t acknowledged by our server)
  • We don’t handle broken connections (no reconnection mechanism)

You may also consider replacing the RPC approach with a publish/subscribe pattern; in this way, the server simply broadcasts its CPU/memory state every X time interval, and one or more clients receive the updates.

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement