До сегодняшнего дня мы запускали Apache Kafka на одном хосте, в кластере из одного брокера и всего с одним монолитным топиком. Для платформы, которая проектировалась под распределенные сценарии, это, наверное, печально. Поэтому сегодня мы станем на шаг ближе к представлению о том, как Kafka работает в нагруженных системах.
Мы создадим кластер из нескольких брокеров, добавим туда разбитый на разделы топик и включим его резервное копирование. При такой конфигурации нужно будет очень постараться, чтобы с кластером случилось что-то неприятное.
Создаём кластер
Думаю, для сегодняшнего эксперимента трёх кафкианских хостов будет достаточно. В идеале, конечно, стоило бы клонировать и ZooKeeper — для надёжности. Но сегодня я почему-то уверен, что ему ничего не грозит, так что обойдёмся и одним.
Чтобы сэмитировать изолированные хосты, я помещу ZooKeeper и всех Кафка-брокеров в собственные Docker контейнеры и запущу их через docker-compose. Последний и упросит конфигурацию, и позаботится о настройке сети.
Dockerfile
Чтобы создать образы контейнеров для Kafka и ZooKeeper нужно всего две вещи: JDK и Kafka инсталлер. Образы будут настолько похожи, что, в принципе, можно обойтись одним Dockerfile и для брокеров, и для координатора.
Брокеров к тому же нужно еще настроить. Например, по-умолчанию Kafka подключается к ZooKeeper через localhost:2181, который внутри контейнера ведёт в никуда. С другой стороны, если дать контейнеру с ZooKeeper внятное название, например — zookeeper, то тогда к нему можно подключаться по zookeeper:2181. Нужно просто немного подредактировать конфигурацию брокера
Второй момент: у каждого брокера внутри кластера должен быть уникальный broker id. Он задаётся в server.properties, и в нашем случае его можно передать прямо из docker-compose в Dockerfile через ARG инструкцию, и потом уже добавить в конфигурацию, например, через sed . Вот в итоге что у меня получилось:
1 2 3 4 5 6 7 8 9 10 11 |
FROM openjdk ARG brokerId=1 ADD kafka_2.11-0.10.1.0.tgz / RUN mv /kafka_2.11-0.10.1.0 /kafka && \ sed -i s/localhost:2181/zookeeper:2181/g /kafka/config/server.properties && \ sed -i s/broker.id=0/broker.id=${brokerId}/g /kafka/config/server.properties EXPOSE 2181 9092 |
В образе openjdk есть JDK, а ADD инструкция добавит и распакует архив с Кафкой, который я предусмотрительно скачал до этого. Потом в RUN переименовываем папку с Кафкой и обновляем server.properties. Наконец, EXPOSE оставит в контейнере два открытых порта: 2181 (ZooKeeper) и 9092 (Kafka).
Проще простого.
docker-compose.yml
docker-compose.yml для запуска зоопарка контейнеров будет посложнее. Ведь чтобы получить внятный и функционирующий кластер, понадобятся контейнеры с ZooKeeper, тремя Kafka серверами, и продюсером и консьюмером сообщений (ну чтобы был хоть какой поток данных). Итого — шесть контейнеров. Это раз в шесть больше, чем обычно.
ZooKeeper
Конфигурация контейнера с ZooKeeper самая простая:
1 2 3 4 5 |
zookeeper: build: . command: /kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties ports: - 2181:2181 |
Она просто соберет Dockerfile, созданный на предыдущем этапе, запустит ZooKeeper внутри контейнера и оставит порт 2181 открытым для хоста. Вдруг нам захочется с ним пообщаться.
Kafka сервера
Конфигурация для Kafka серверов чуть-чуть сложнее, но всё еще вменяемая. Конфигураций будет три, и для сервера под номером 2 она будет такой:
1 2 3 4 5 6 7 8 |
kafka2: build: context: . args: brokerId: 2 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper |
depends_on поможет запустить контейнер с Кафкой обязательно после контейнера с ZooKeeper, а brokerId: 2 пойдёт прямиком в Dockerfile, откуда перекочует в server.properties.
Продюсер
Продюсер — это маленький филиал ада. И всё из-за команды command . Ведь чтобы запустить продюсер, отправляющий сообщения, нужно всего-то:
1 2 3 4 5 6 7 8 9 10 11 |
sleep 4 /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 2 --topic dates while true; do date | /kafka/bin/kafka-console-producer.sh \ --broker-list kafka1:9092,kafka2:9092,kafka3:9092 \ --topic dates; sleep 1; done |
Что в переводе на наш означает:
- подожди 4 секунды (решать race condition через задержки — плохой тон, но с воспитанием в нашу эпоху действительно не очень),
- создай топик под названием ‘dates’, раздели его на два раздела (partitions) и храни по три копии каждого,
- раз в секунду отправляй в топик сообщение — текущую дату.
Просто ведь. Но если ужать это в одну строчку, то получится вот что:
1 2 3 4 5 6 7 8 |
producer: build: . command: bash -c "sleep 4 && /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 2 --topic dates && while true; do date | /kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic dates; sleep 1; done " depends_on: - zookeeper - kafka1 - kafka2 - kafka3 |
Дурдом.
Консьюмер
По сравнению с продюсером, консьюмер практически не уродлив:
1 2 3 4 5 6 7 8 |
consumer: build: . command: bash -c "sleep 6 && /kafka/bin/kafka-console-consumer.sh --topic dates --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092" depends_on: - zookeeper - kafka1 - kafka2 - kafka3 |
И продюсер, и консюмер получили в качестве параметров списки всех Kafka серверов. Это нужно для того, чтобы когда кого-то из брокеров не станет, у сервисов был запасной вариант.
Результат
Вот на что похож docker-compose.yml целиком:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
version: '2' services: zookeeper: build: . command: /kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties ports: - 2181:2181 kafka1: build: context: . args: brokerId: 1 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper kafka2: build: context: . args: brokerId: 2 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper kafka3: build: context: . args: brokerId: 3 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper producer: build: . command: bash -c "sleep 4 && /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 2 --topic dates && while true; do date | /kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic dates; sleep 1; done " depends_on: - zookeeper - kafka1 - kafka2 - kafka3 consumer: build: . command: bash -c "sleep 6 && /kafka/bin/kafka-console-consumer.sh --topic dates --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092" depends_on: - zookeeper - kafka1 - kafka2 - kafka3 |
Запускаем кластер
Всё просто. Запускаем docker-compose up и через десяток секунд монологов от очень разговорчивых контейнеров наступит тишина, иногда прерываемая сообщениями от консьюмера:
1 2 3 4 5 6 |
# consumer_1 | Tue Dec 13 05:23:31 UTC 2016 # consumer_1 | Tue Dec 13 05:23:34 UTC 2016 # consumer_1 | Tue Dec 13 05:23:37 UTC 2016 # consumer_1 | Tue Dec 13 05:23:39 UTC 2016 # consumer_1 | Tue Dec 13 05:23:43 UTC 2016 # consumer_1 | Tue Dec 13 05:23:46 UTC 2016 |
Кажется, оно действительно запустилось.
Так как 2181-й порт у ZooKeeper остался открыт, можно запросить у него статистику по топикам:
1 2 3 |
./bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181 # Topic: dates Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 # Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 |
Ответ ZooKeeper’а весьма интересный: у топика ‘dates’ есть два раздела — 0 и 1, а их лидеры — брокеры 3 и 1 соответственно. Лидер раздела — это брокер, через который походит чтение и запись, и который отвечает за синхронизацию его реплик. В нашем случае реплики разделов хранятся у брокеров 3,1,2 и 1,2,3, и все они синхронизированы (Isr — in-sync replica), то есть все резервные копии на месте.
Так как обычно я пишу эти посты ближе к часу ночи, то будет справедливо, если кластер пострадает вместе со мной.
Издеваемся над Kafka кластером
Когда все брокеры в кластере работают, кластеру, разумеется, хорошо. Что будет, если какого-то из брокеров не станет? Выясняем ID контейнера kafka2 (docker-compose ведь изменил имя) через docker ps , останавливаем его, и смотрим, что получилось:
1 2 |
docker stop 12bda0311443 # 12bda0311443 |
Что здорово, consumer-контейнер как писал в консоль свои сообщения, так и продолжил без паузы. Но если посмотреть статистику топика, то кластер изменился:
1 2 3 4 |
./kafka/bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181 #Topic:dates PartitionCount:2 ReplicationFactor:3 Configs: # Topic: dates Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1 # Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3 |
Лидеры разделов остались теми же, но реплика с номером 2 пропала.
А что будет, если ещё кого-нибудь прибить?
Если остановить kafka3, то с потоком сообщений всё останется в порядке, но лидер нулевого раздела поменяется:
1 2 3 4 |
./bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181 #Topic:dates PartitionCount:2 ReplicationFactor:3 Configs: # Topic: dates Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1 # Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1 |
Теперь оставшийся брокер kafka1 — лидер всего. Реплика 3, разумеется, пропала.
Наконец, что будет, если снова вернуть kafka2 и kafka3 в строй?
1 2 3 4 |
./bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181 #Topic:dates PartitionCount:2 ReplicationFactor:3 Configs: # Topic: dates Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,3,2 # Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2 |
ZooKeeper их заметил. Лидер разделов остался старым, но пропущенные реплики пересоздались и уютно расположились в «новых» брокерах. Кластер вернулся в сбалансированное состояние.
Мораль
Только что произошло немного чёрной уличной магии. Мы создали кластер из трёх хостов с фрагментированным топиком, а затем убили два хоста из трёх, и это никак не повлияло на нашу возможность публиковать и получать сообщения. Когда хосты вернулись назад, кластер перегруппировался и продолжил работать, как ни в чём не бывало.
Сегодняшный пример получился намного сложнее, чем предыдущий, но вся сложность пришла от Докера. Единственное изменение в конфигурации Кафка-сервера по сравнению с кластером из одного хоста — добавился broker id. То есть изменив всего один численный параметр мы можем добавлять сервера в кластер пачками. Невероятно.
Большое спасибо! Весьма познавательно и интересно!
Пожалуйста
Век живи — век учись, спасибо за статью
Пожалуйста
Отличный блог, очень интересно. Спасибо!
Спасибо!
Павел, спасибо за интересный пост!
Попробуйте убрать broker.id вообще из конфигов. Я случайно наткнулся на то, что kafka будет их генерировать начиная с максимального и, если следующий брокер увидит занятый идентификатор в зукипере, то будет его уменьшать на -1 до свободного.
Блин, здорово, не знал. Спасибо!
Спасибо. Мы планируем поднимать отказоустойчивый стек на разных нодах в AWS и у меня возник интересный вопрос. Брокеры мы то плодим, а зукипер остается один. Что будет если завалить зукипер? Как это скажется на отказоустойчивости? Я знаю что можно поднять zookeeper quorum и kafka умеет конектится к кворуму, а не только к одному зукиперу. Но в оф доке как то не особо этот момент описан.
Мой ответ будет очень теоретическим, поэтому свой продакшен на его основе лучше не строить 🙂
Без зукипира кафка жить не может, но это не значит, что он тут же и рухнет. Товарищи на Quora (https://www.quora.com/What-is-the-actual-role-of-Zookeeper-in-Kafka-What-benefits-will-I-miss-out-on-if-I-don%E2%80%99t-use-Zookeeper-and-Kafka-together) могли писать в и читать из существующих топиков и после того, как зукипер ушёл. И это, в принципе, объяснимо. В памяти у брокеров должна оставаться последняя копия состояния кластера, и до тех пор, пока ничего не поменялось, некоторое время можно пожить и так. Но не долго.
В настройки брокера можно передать больше, чем один хост с зукипером — через запятую. В моём Dockerfile это бы выглядело как-нибудь так:
sed -i s/localhost:2181/zookeeper1:2181,zookeeper2:2181/g /kafka/config/server.properties
. Вот тут есть более развёрнутый пример — https://richardstechnotes.wordpress.com/2015/12/19/setting-up-apache-kafka-for-use-with-an-apache-zookeeper-quorum-on-ubuntu/Спасибо, классный блог!
Я хочу стать как ты когда вырасту. Всё время когда ищу какуюто инфу по работе не твои посты натыкаюсь 🙂 Спасибо за инфу и за етот блог 🙂