RabbitMQ

Why RabbitMQ?

What is Messaging system, please refer: https://sathishkariyanna.blogspot.com/p/kafka.html
  • Reliable and Open Source
  • Supports several standardized protocols such as AMQP 0-9-1(Advanced Message Queuing Protocol), AMQP 1-0, MQTT, STOMP, STOMP over WebSockets, MQTT over WebSockets.
  • Loosely coupled for the applications.
  • Highly Scalable
    1. You only have to maintain the producers and the consumers sending and receiving messages to/from the queue. Under heavy load,  if the queue grows larger, the standard reaction is to add more consumers and parallelize the work. This is a simple and effective method of scaling.
    2. You can also allow the broker to scale (add more resources through CPU/disk/memory), to be able to handle more messages in the queue. But remember that RabbitMQ works fastest with short queues.
In this article, we will consider you are using RabbitMQ over amqp 0-9-1 protocol. 

Architecture Diagram of RabbitMQ


 


 
Messages are not published directly to a queue. Instead, the producer sends messages to an exchange. Exchanges are message routing agents, defined by the virtual host within RabbitMQ. An exchange is responsible for routing the messages to different queues with the help of header attributes, bindings, and routing keys.
A binding is a "link" that you set up to bind a queue to an exchange.
The routing key is a message attribute the exchange looks at when deciding how to route the message to queues (depending on exchange type).

Exchange Types:
  • Direct Exchange
    •  Delivers messages to queues based on a message routing key.
    •  Routing key is a message attribute added to the message header by the producer. Think of the   routing key as an "address" that the exchange is using to decide how to route the message. A  message goes to the queue(s) with the binding key that exactly matches the routing key of the   message.
  • Default Exchange
    •  The default exchange is a pre-declared direct exchange with no name, usually referred by an  empty string. When you use default exchange, your message is delivered to the queue with a  name equal to the routing key of the message. Every queue is automatically bound to the default exchange with a routing key which is the same as the queue name.
  • Topic Exchange
    • Route messages to queues based on wildcard matches between the routing key and the routing pattern, which is specified by the queue binding
    • Messages are routed to one or many queues based on a matching between a message routing key and this pattern.
    • The routing key must be a list of words, delimited by a period (.). Examples are agreements.us and agreements.us.dallas which in this case identifies agreements that are set up for a company with offices in lots of different locations.
    • Fanout Exchange
      • Copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges.
      • The keys provided will simply be ignored.
      • Useful when the same message needs to be sent to one or more queues with consumers who may process the same message in different ways.
    • Headers Exchange
      • Routes messages based on arguments containing headers and optional values.
      • Very similar to topic exchanges, but route messages based on header values instead of routing keys
      • A message matches if the value of the header equals the value specified upon binding.
      • A special argument named "x-match", added in the binding between exchange and queue, specifies if all headers must match or just one
        • "x-match" property can have two different values: "any" or "all", where "all" is the default value. A value of "all" means all header pairs (key, value) must match, while value of "any" means at least one of the header pairs must match.
        • Headers can be constructed using a wider range of data types, integer or hash for example, instead of a string. The headers exchange type (used with the binding argument "any") is useful for directing messages which contain a subset of known (unordered) criteria.
    • Dead Letter Exchange
      • If no matching queue can be found for the message, the message is silently dropped (mandatory field set to False) or returned to publisher (if mandatory field is set to True while publishing message

    Understanding each component and its configuration

    Connection
    AMQP 0-9-1 provides a way for connections to multiplex over a single TCP connection. That means an application can open multiple "lightweight connections" called channels on a single connection. AMQP 0-9-1 clients open one or more channels after connecting and perform protocol operations (manage topology, publish, consume) on the channels.

    Connection Lifecycle
    In order for a client to interact with RabbitMQ it must first open a connection. This process involves a number of steps:
    • Application configures the client library it uses to use a certain connection endpoint (e.g. hostname and port)
    • The library resolves the hostname to one or more IP addresses
    • The library opens a TCP connection to the target IP address and port
    • After the server has accepted the TCP connection, protocol-specific negotiation procedure is performed
    • The server then authenticates the client
    • The client now can perform operations, each of which involves an authorization check by the server.
    • The client retains the connections for as long as it needs to communicate with RabbitMQ

    Parameters while opening a connection (may vary based on client library used)
    • host - Hostname or IP Address to connect to
    • port - TCP port to connect to
    • virtual_host - RabbitMQ virtual host to use
    • credentials - auth credentials
    • channel_max - Maximum number of channels to allow
    • frame_max - The maximum byte size for an AMQP frame
    • heartbeat - Controls AMQP heartbeat timeout negotiation during connection tuning. An integer value always overrides the value proposed by broker. Use 0 to deactivate heartbeats and None to always accept the broker's proposal. If a callable is given, it will be called with the connection instance and the heartbeat timeout proposed by broker as its arguments. The callback should return a non-negative integer that will be used to override the broker's proposal.
    • ssl_options - None for plaintext or SSLOptions instance for SSL/TLS. Defaults to None.
    • connection_attempts - Maximum number of retry attempts
    • retry_delay - Time to wait in seconds, before the next
    • socket_timeout - Positive socket connect timeout in seconds.
    • stack_timeout - Positive full protocol stack (TCP/[SSL]/AMQP) bring-up timeout in seconds. It's recommended to set this value higher than socket_timeout.
    • blocked_connection_timeout - If not None, the value is a non-negative timeout, in seconds, for the connection to remain blocked (triggered by Connection. Blocked from broker); if the timeout expires before connection becomes unblocked, the connection will be torn down, triggering the adapter-specific mechanism for informing client app about the closed connection: passing ConnectionBlockedTimeout exception to on_close_callback in asynchronous adapters or raising it in BlockingConnection.
    • client_properties - None or dict of client properties used to override the fields in the default client properties reported to RabbitMQ via Connection.StartOk method.
    • tcp_options - None or a dict of TCP options to set for socket
    Exchange
    Parameters you can set while creating/declaring exchange
    • durable - Durable exchanges survive server restarts and last until they are explicitly deleted
    • temporary - Temporary exchanges exist until RabbitMQ is shut down.
    • auto-delete - Auto-deleted exchanges are removed once the last bound object is unbound from the exchange.
    • exchange_type -
      • Options - direct, fanout, headers, topic, default….
    • arguments - Custom key/value pair arguments for the exchange
    • internal
      • Options - True, False
      • Can only be published to by other exchanges
    Queue
    Parameters you can set while declaring queue
    • name - unique queue name
    • durable - If set to true, queue survive reboots of the broker
      • options - True/False
    • exclusive - can be used only by its declaring connection (queue deleted when the connection is closed)
    • options - True/False
    • auto-delete - Delete queue after consumer cancels or disconnects
      • options - True/False
    • arguments - Custom key/value arguments for the queue
    Binding a queue to an exchange
    • name - name of queue to bind
    • exchange - The source exchange to bind to
    • routing_key - The routing key to bind on
    • arguments - Custom key/value pair arguments for the binding
    Unbinding queue from an exchange
    • You can set same parameters as above while unbinding

    Types of Queues
    Ephemeral
    • Auto-Delete Queue
    • Queue Expiration (TTL)
    • Exclusive Queues
    • Auto-delete Exchange
    Lazy Queues
    • Memory optimized queues
    Priority Queues
    • No longer FIFO
    • Publisher sets priority on messages
    • Priority Queue moves higher priority messages ahead of lower
    • Drawbacks – blocked low priority messages, low priority can eject high priority when queue is full

    Virtual Hosts
    To make it possible for a single broker to host multiple isolated "environments" (groups of users, exchanges, queues and so on), AMQP 0-9-1 includes the concept of virtual hosts (vhosts). They are similar to virtual hosts used by many popular Web servers and provide completely isolated environments in which AMQP entities live. Protocol clients specify what vhosts they want to use during connection negotiation.

    Consumers
    Messages can be consumed by registering a consumer (subscription), which means RabbitMQ will push messages to the client, or fetched individually for protocols that support this (e.g. the basic.get AMQP 0-9-1 method), similarly to HTTP GET.

    RabbitMQ pushes the messages to consumers and consumers acknowledge it.
    Consumer subscribes to listen messages on a queue (by creating a new one or connecting to existing one), and then it’s your choice if you want consumer to acknowledge the message by send ack to the broker.
    If you have setup this acknowledgement to make system more reliable (of course this will decrease performance/throughput but it’s worth it in case you want to assure reliability), queue will remove the message from queue only after getting the acknowledgement from the consumer. 
    Basic steps while setting up a consumer
    • Make a connection
    • Get a channel from connection
    • Declare queue (if it is not setup)
    • set qos(quality of service) over channel
      • prefetch_size - This field specifies the prefetch window size. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning "no specific limit", although other prefetch limits may still apply.
      • prefetch_count - Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it.
      • ∫ - Should the QoS apply to all channels on the connection.
    Note: Both prefetch-size and prefetch-count ignored when no-ack set to true (that is how auto-ack functionality called in RabbitMQ and in AMQP docs), messages will be sent to client one-by-one and removed from queue after successful sending.
    Parameters
    • queue - The queue from which to consume
    • on_message_callback - function to run after message is consumed from queue
    • auto_ack
      • Options - True/False
      • if set to True, automatic acknowledgement mode will be used. This corresponds with the 'no_ack' parameter in the basic.consume AMQP 0.9.1 method
    • exclusive - If set to True, Don't allow other consumers on the queue
    • consumer_tag - You may specify your own consumer tag; if left empty, a consumer tag will be generated automatically
    • arguments - Custom key/value pair arguments for the consumer

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;

    public class ConsumerExample {
      private static final String EXCHANGE_NAME = "logs";

      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
      }
    }
      


    Publisher
    Steps generally followed by publisher/producer
    • Make a connection
    • Get a channel from connection, one connection can be multiplexed into multiple channels. (You can use a common connection all over your application, you can create multiple channels for performing different kinds of publishment, but it totally depends on your strategy)
    • Publish Message
    Parameters you can set while publishing a message to RabbitMQ
    • exchange - The exchange to publish to
    • routing_key - The routing key to bind on
    • body - The message body; empty string if no body
    • properties -
      • message properties
      • You can set various properties like delivery_mode (Persistent [message will be saved to disk in case queue is durable, this will prevent message to be lost in case of broker restart], Transient), priority, headers (for headers exchange)
      • Saving message to disk (in case of Persistent message) can even happen after the message has been consumed by consumer
      • Edge case - What if broker crashes before saving message to disk in case message is persistent and queue is durable and you can’t afford that? → Solution is Publisher Confirms or using Transactional Channels
    • mandatory -
      • If set to True, incase message is un-routable, it will be returned to publisher
      • Prerequisite - You should have setup a return message handler function
    • Using standard AMQP 0-9-1, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. [Transactional Channels discussed later]To remedy this, a confirmation mechanism was introduced. It mimics the consumer acknowledgements mechanism already present in the protocol.
            Publisher Confirms
    • To enable confirms, a client sends the confirm.select method. Depending on whether no-wait was set or not, the broker may respond with a confirm.select-ok. Once the confirm.select method is used on a channel, it is said to be in confirm mode. A transactional channel cannot be put into confirm mode and once a channel is in confirm mode, it cannot be made transactional.
    • Once a channel is in confirm mode, both the broker and the client count messages (counting starts at 1 on the first confirm.select). The broker then confirms messages as it handles them by sending a basic.ack on the same channel. The delivery-tag field contains the sequence number of the confirmed message. The broker may also set the multiple field in basic.ack to indicate that all messages up to and including the one with the sequence number have been handled.
    • In order to track which messages are confirmed by broker, you can store the messages published in a map kind of data structure(key value pairs, key- message sequence no., value - message body)
    • The sequence number can be obtained with Channel#getNextPublishSeqNo() before publishing
    • Once the message is confirmed(message ack is received on publisher), you can remove the message for this map.
    • For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk.
    • basic.ack for a persistent message routed to a durable queue will be sent after persisting the message to disk. The RabbitMQ message store persists messages to disk in batches after an interval (a few hundred milliseconds) to minimise the number of fsync(2) calls, or when a queue is idle.
    • This means that under a constant load, latency for basic.ack can reach a few hundred milliseconds. To improve throughput, applications are strongly advised to process acknowledgements asynchronously (as a stream) or publish batches of messages and wait for outstanding confirms. The exact API for this varies between client libraries.
    • For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).
    • Techniques to implement Publisher Confirms (check this article for more detailed explanation - https://www.rabbitmq.com/tutorials/tutorial-seven-java.html)
    • publishing messages individually, waiting for the confirmation synchronously: simple, but very limited throughput.
    • publishing messages in batch, waiting for the confirmation synchronously for a batch: simple, reasonable throughput, but hard to reason about when something goes wrong.
    • asynchronous handling: best performance and use of resources, good control in case of error, but can be involved to implement correctly.

    How to make system fully reliable? (in case you can’t afford message drops at all)
    • Save messages to disk on RabbitMQ server
      • Making exchange and queue durable, so that in case of server restart/shutdown, they still exist and do not get removed automatically
      • Setting message persistent parameter, in order to save message to disk
    • Using Manual Acknowledgements in consumer
    • Using Publisher Confirms or Transactional Channels
    • Handle case if message is published multiple times or consumed multiple times, by making consumer task execution idempotent.

    public class ProducerExample {

      private static final String EXCHANGE_NAME = "logs";

      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                                String.join(" ", argv);

            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
      }
    }


    RabbitMQ Admin console


     

    References



    No comments:

    Post a Comment