I am new to rabbitmq and trying to figure out how I can make a client request a server with information about memory and CPU utilization with this tutorial (https://www.rabbitmq.com/tutorials/tutorial-six-python.html).
So the client requests for CPU and memory ( I believe I will need two queues) and the server respond with the values.
Is there anyway to simple create a client.py
and server.py
with this case using the Pika library in Python.
Advertisement
Answer
I would recommend you to follow the first RabbitMQ tutorials if you haven’t already. The RPC example builds on concepts covered on previous examples (direct queues, exclusive queues, acknowledgements, etc.).
The RPC solution proposed on the tutorial requires at least two queues, depending on how many clients you want to use:
- One direct queue (
rpc_queue
), used to send requests from the client to the server. - One exclusive queue per client, used to receive responses.
The request/response cycle:
- The client sends a message to the
rpc_queue
. Each message includes areply_to
property, with the name of the client exclusive queue the server should reply to, and acorrelation_id
property, which is just an unique id used to track the request. - The server waits for messages on the
rpc_queue
. When a message arrives, it prepares the response, adds thecorrelation_id
to the new message, and sends it to the queue defined in thereply_to
message property. - The client waits on its exclusive queue until it finds a message with the
correlation_id
that was originally generated.
Jumping straight to your problem, the first thing to do is to define the message format you’ll want to use on your responses. You can use JSON, msgpack or any other serialization library. For example, if using JSON, one message could look something like this:
{ "cpu": 1.2, "memory": 0.3 }
Then, on your server.py
:
def on_request(channel, method, props, body): response = {'cpu': current_cpu_usage(), 'memory': current_memory_usage()} properties = pika.BasicProperties(correlation_id=props.correlation_id) channel.basic_publish(exchange='', routing_key=props.reply_to, properties=properties, body=json.dumps(response)) channel.basic_ack(delivery_tag=method.delivery_tag) # ...
And on your client.py
:
class ResponseTimeout(Exception): pass class Client: # similar constructor as `FibonacciRpcClient` from tutorial... def on_response(self, channel, method, props, body): if self.correlation_id == props.correlation_id: self.response = json.loads(body.decode()) def call(self, timeout=2): self.response = None self.correlation_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.correlation_id), body='') start_time = time.time() while self.response is None: if (start_time + timeout) < time.time(): raise ResponseTimeout() self.connection.process_data_events() return self.response
As you see, the code is pretty much the same as the original FibonacciRpcClient
. The main differences are:
- We use JSON as data format for our messages.
- Our client
call()
method doesn’t require abody
argument (there’s nothing to send to the server) - We take care of response timeouts (if the server is down, or if it doesn’t reply to our messages)
Still, there’re a lot of things to improve here:
- No error handling: For example, if the client “forgets” to send a
reply_to
queue, our server is gonna crash, and will crash again on restart (the broken message will be requeued infinitely as long as it isn’t acknowledged by our server) - We don’t handle broken connections (no reconnection mechanism)
- …
You may also consider replacing the RPC approach with a publish/subscribe pattern; in this way, the server simply broadcasts its CPU/memory state every X time interval, and one or more clients receive the updates.