RabbitMQ — это полноценная и щедро удобренная фичами очередь сообщений. В отличие от ZeroMQ, который встраивается в приложения, RabbitMQ — сервис-посредник. Он разграничивает права доступа, поддерживает шифрование, сохранение сообщений на диск (чтобы пережить плановое отключение электричества), работу в кластерах и даже дублирование сервисов для повышенной живучести. К тому же он написан на Erlang, за что автоматически становится неубиваемым и поддерживаемым на большинстве популярных ОС.
В этом посте мы посмотрим, насколько тяжело отправлять и получать сообщения с RabbitMQ, да и вообще, на что он похож вблизи. В качестве платформы будет Убунта (запертая внутри Docker контейнера), но сгодился бы и Mac, и Windows.
Установка
Я не буду фокусироваться на установке слишком сильно, тем более что официльная документация весьма ничего, но самый простой способ получить работающий RabbitMQ на своей машине — через Docker и rabbitmq образ.
1 2 |
$ docker run -ti rabbitmq bash # root@714dbe064eef:/# |
Внутри получившегося контейнера будет полностью рабочий сервер сообщений, который нужно просто запустить:
1 2 |
$ service rabbitmq-server start #[ ok ] Starting message broker: rabbitmq-server. |
Если с Докером у вас не сложилось, то на настоящей машине установить RabbitMQ тоже не проблема. Вот на что это похоже на Убунте:
1 2 3 4 |
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get update sudo apt-get install rabbitmq-server |
На других системах, возможно, будет даже легче, но я всё равно очень рекомендую потратить немного времени и разобраться с Docker, потому что он идеально подходит для изучения таких вот вещей. Просто берем образ с уже установленным RabbitMQ, Redis, Elasticsearch, Jenkins — да вообще с чем угодно, играемся вволю, а потом удаляем контейнер целиком.
Отправка и получение сообщений с rabbitmqadmin
Отправить и получить сообщение можно не написав ни строчки кода. В комплект с RabbitMQ идёт management plugin, в котором есть милая python-утилитка — rabbitmqadmin . С ней можно создавать и удалять очереди сообщений, проверять их статус, а так же отправлять, собственно, сами сообщения. Management plugin выключен по умолчанию, так что его придётся сначала включить. Правда, можно было бы просто взять rabbitmq:management образ, где это уже сделано, но мы бы сэкономили всего одну команду.
Включаем rabbitmqadmin
1 2 3 4 5 6 7 8 9 10 |
$ rabbitmq-plugins enable rabbitmq_management #The following plugins have been enabled: # mochiweb # webmachine # rabbitmq_web_dispatch # amqp_client # rabbitmq_management_agent # rabbitmq_management # #Applying plugin configuration to rabbit@fb252aa8ddbd... started 6 plugins. |
..и всё. rabbitmqadmin расположен в удивительной глубины и непроизносимости папке, так что его сразу стоит добавить либо в PATH переменную, либо сразу в /usr/local/bin/.
1 2 |
find /var/lib/rabbitmq -name rabbitmqadmin | xargs -I{} sudo ln -s {} /usr/local/bin/rabbitmqadmin sudo chmod +x /usr/local/bin/rabbitmqadmin |
Теперь попробуем что-нибудь создать.
Играем с очередями и сообщениями
RabbitMQ — сервис-посредник, который создаёт и управляет очередями сообщений. В архитектуре распределенного приложения он будет где-то посередине:
Во-первых, давайте посмотрим, какие очереди в нём есть по-умолчанию:
1 2 |
$ rabbitmqadmin list queues # No items |
Никаких. Но это вполне себе поправимо:
1 2 3 4 5 6 7 8 |
$ rabbitmqadmin declare queue name=demoqueue durable=false #queue declared $ rabbitmqadmin list queues +-----------+----------+ | name | messages | +-----------+----------+ | demoqueue | 0 | +-----------+----------+ |
Теперь у нас есть новая очередь с названием «demoqueue». Так как она держит сообщения в памяти ( durable=false ), то падение сервиса похоронит под собой все сообщения, которые он не успел отправить. Если это проблема, то можно сделать durable=true .
Кстати о сообщениях — отправим-ка чего:
1 2 3 4 5 6 7 8 9 |
$ rabbitmqadmin publish exchange=amq.default routing_key=demoqueue \ payload="Behold! This is the message" # Message published $ rabbitmqadmin list queues +-----------+----------+ | name | messages | +-----------+----------+ | demoqueue | 1 | +-----------+----------+ |
Команда отправки сообщения не содержит ни одного слова, хоть отдалённо напоминающего «отправка» и «сообщение», так что стоит чуть-чуть углубиться в терминологию.
Кто такой AMQP
В мире очередей сообщений есть попытка прийти к общему стандарту, и эта попытка сегодня называется AMQP (Advanced Message Queuing Protocol). По этому стандарту между отправителем сообщения и очередью должен быть еще один игрок — exchange. Отправитель кладёт сообщение в exchange, а уже exchange в зависимости от своего типа и того, какие очереди к нему привязаны, решает, куда оно отправится.
Например, в AMQP есть exchange типа «fanout», который отправляет копии сообщения сразу всем «своим» очередям.
Другой тип exchange — «direct» — отправляет сообщение только одной очереди. Какой именно — определяет маршрутный ключ (routing key), подорожником прикладываемый к сообщению.
Наконец, чтобы полностью соответствовать протоколу AMQP, у брокера должны быть несколько exchange по-умолчанию. Один из них — amq.default — должен иметь тип ‘direct’. Как только создаётся новая очередь, она автоматически к нему привязывается, и routing key будет совпадать с её именем.
Так как RabbitMQ поддерживает AMQP полностью, то в нём есть и exchange, и amq.default, так что отправка сообщения такой странной командой теперь выглядит чуть-чуть более логичной:
1 |
$ rabbitmqadmin publish exchange=amq.default routing_key=demoqueue payload="..." |
Наконец, опубликованное сообщение можно прочитать назад:
1 2 3 4 5 6 |
$ rabbitmqadmin get queue=demoqueue requeue=true +-------------+----------+---------------+-----------------------------+---------------+------------------+------------+-------------+ | routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | properties | redelivered | +-------------+----------+---------------+-----------------------------+---------------+------------------+------------+-------------+ | demoqueue | | 0 | Behold! This is the message | 27 | string | | False | +-------------+----------+---------------+-----------------------------+---------------+------------------+------------+-------------+ |
Читают сообщение напрямую из очереди, без exchange. Я добавил в конец параметр requeue=true , чтобы после того, как сообщение прочитали, оно вернулось в очередь.
Отправка и получение сообщений через NodeJS
Отправлять и получать сообщения через командную строку, конечно, весело, но только до определенного момента. Интересно, насколько сложнее сделать то же самое из кода.
Ни насколько. Для примера я выбрал JavaScript, но любой язык, на который портирован AMQP клиент, подошёл бы. В NPM есть пакет с говорящим названием amqplib и это как раз то, что нам понадобится:
1 |
npm install amqplib |
Чтобы что-то отправить, нужно сначала открыть соединение и канал:
1 2 3 4 |
const amqp = require('amqplib'); amqp.connect('amqp://localhost') .then(connection => connection.createChannel()) |
В amqplib есть маленькая возможность схитрить — сообщение можно отправить напрямую очереди, в обход exchange, и этой хитростью я сейчас и воспользуюсь.
Итак, соединение есть, канал есть, осталось только убедиться, что очередь тоже есть:
1 2 3 4 |
//.... .then(channel, function () { return channel.assertQueue('demoqueue', {durable: false}); }) |
Даже если очереди не было, assertQueue теперь её создал.
Но что, теперь всё есть. Пора бы что-нибудь и отправить:
1 2 3 4 5 6 |
.then(function () { return channel.sendToQueue( 'demoqueue', new Buffer('Behold! (One more time)') ); }); |
Всё! Пример небольшой получился, но его хватило, чтобы подключиться к брокеру, создать очередь сообщений и положить туда посылку через AMQ протокол.
Чтобы прочитать сообщение обратно, нужно поменять в примере всего один метод. В этот раз я приведу код целиком:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
const amqp = require('amqplib'); const queueName = 'demoqueue'; amqp.connect('amqp://localhost') .then(connection => connection.createChannel()) .then(function (channel) { return channel .assertQueue(queueName, {durable: false}) .then(function () { return channel.consume(queueName, function (msg) { console.log(`message received ${msg.content.toString()}`); }); }); }) |
В consume можно было бы добавить параметр {noAck: false} — не подтверждать получение, чтобы оно осталось в очереди.
Итак
В этой гигантской по моим меркам статье мы лишь вкратце прошлись по тому, что умеет RabbitMQ. Настолько вкратце, что ни одна из фич, о которых я упомянул вначале, тут так и не встретилась. Но фокус был в другом — хотя RabbitMQ просто нафарширован фичами, не все из которых я могу выговорить, пользоваться им очень легко.
Но, с другой стороны, писать hello-world на инструменте, который может масштабироваться по облакам, тоже чуть-чуть издёвка, так что в следующий раз мы попробуем что-нибудь посложнее.