
Client
public class Main {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
int count = 0;
while (count < 100) {
String time = getTime();
System.out.println(time);
Thread.sleep(1000);
count++;
}
}
private static String getTime() throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String requestQueueName = "rpc01";
String replyQueueName = channel.queueDeclare().getQueue();
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
String message = "Mahmood";
// use to block until there is a response available
BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8"));
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
// save in the response queue
// so the queue can release the blockage
response.offer(new String(body, "UTF-8"));
}
}
};
channel.basicConsume(replyQueueName, true, consumer);
// block until there is a response available
String result=response.take();
channel.close();
connection.close();
return result;
}
}
Server
public class Main {
private static String requestQueueName = "rpc01";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(requestQueueName, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String responseQueueName = properties.getReplyTo();
String correlationId = properties.getCorrelationId();
System.out.println(String.format("CorrelationId: %s, Response Queue: %s", correlationId, responseQueueName));
String message = new String(body, "UTF-8");
String response = String.format("Hello %s, Time is : %s", message, new Date().toString());
AMQP.BasicProperties responseProperties = new AMQP.BasicProperties().builder()
.correlationId(correlationId)
.build();
channel.basicPublish("", responseQueueName, responseProperties, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(requestQueueName, false, consumer);
}
}
References
https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://github.com/mhdr/RabbitMQSamples/tree/master/006_RPC