W dzisiejszym świecie bardzo często spotykamy się z wymaganiami dotyczącymi przetwarzania danych w czasie rzeczywistym. Na rynku istnieje dość sporo narzędzi, które nam to pozwalają osiągnąć. W czołówce możemy wyróżnić: Apache Kafka oraz Apache Flink. Często w tym samym “worku” można spotkać jeszcze Spark Structured Streaming lub Spark Streaming, lecz jest to błąd, ponieważ Spark reprezentuję podejście, które nazywamy “micro-batch” – czyli przetwarzanie danych w małych paczkach.
W tym poście naszym celem będzie stworzenie przetwarzania strumieniowego w czasie rzeczywistym przy wykorzystaniu architektury stworzonej przy użyciu docker-compose. Logikę przetwarzania zaimplementujemy w Talend Open Studio for Big Data (TOSBD).
Przygotowujemy środowisko na Docker’ach
Na początku zajmiemy się stworzeniem środowiska, którę będzie się składać z:
- 3x Zookeeper
- 3x Kafka Broker
- 1x MongoDB
Dzięki Docker-compose jesteśmy wstanie w bardzo szybki sposób przygotować każde środowisko. Ogranicza nas tylko wyobraźnia lub aktualne wymagania 🙂
Poniższy diagram przedstawia naszą architekturę, którą zadeklarujemy w pliku “docker-compose.yml”.

Docker-compose.yml
Zakładam, że masz już zainstalowany docker-compose u siebie na komputerze. Jeśli nie to odsyłam Cie na chwilę do mojego innego posta, gdzie opisałem jak zainstalować Docker-Compose.
Tworzymy plik docker-compose.yml. Postarajmy się zdefiniować powyższy diagram na zapis zrozumiały dla docker-compose.
Zookeeper Cluster
Zdefiniujmy trzy kontenery odpowiadające za 3 niezależne instancje Zookeeper’a.
version: "2" services: zookeeper-1: image: confluentinc/cp-zookeeper:latest hostname: zookeeper-1 ports: - "12181:12181" environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 12181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888 zookeeper-2: image: confluentinc/cp-zookeeper:latest hostname: zookeeper-2 ports: - "22181:22181" environment: ZOOKEEPER_SERVER_ID: 2 ZOOKEEPER_CLIENT_PORT: 22181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888 zookeeper-3: image: confluentinc/cp-zookeeper:latest hostname: zookeeper-3 ports: - "32181:32181" environment: ZOOKEEPER_SERVER_ID: 3 ZOOKEEPER_CLIENT_PORT: 32181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888
Kafka Cluster
Następnie dodajmy do istniejącego pliku docker-compose.yml część odpowiedzialną za utowrzenie trzech kontenerów (broker’ów), które razem stworzą nam nasz klaster Kafki. Jak widzimy w poniższym zapisie “depends_on” – borker’y zależą od Zookeeper’a. Dzięki temu zapisowi docker-compose najpierw poczeka, aż wszystkie instancje Zookeep’a będą uruchomione zanim będzie się starał postawić brokery.
kafka-1: image: confluentinc/cp-kafka:latest hostname: kafka-1 ports: - "19092:19092" depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092 kafka-2: image: confluentinc/cp-kafka:latest hostname: kafka-2 ports: - "29092:29092" depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092 kafka-3: image: confluentinc/cp-kafka:latest hostname: kafka-3 ports: - "39092:39092" depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092
MongoDB
Na samym końcu zdefiniujmy naszą receptę na bazę MongoDB. W tym przypadku stworzymy jeden kontener, bez router’ów oraz nodów konfiguracyjnych.
mongodb: image: mongo:latest hostname: mongodb environment: - MONGO_DATA_DIR=/data/db - MONGO_LOG_DIR=/dev/null - MONGODB_USER=test - MONGODB_DATABASE=test_db - MONGODB_PASS=bigdata_test volumes: - ./data/db:/data/db ports: - 27017:27017 command: mongod --smallfiles --logpath=/dev/null
Edycja /etc/hosts
Aby móc połączyć się po nazwach hostów do naszego środowiska zbudowanego na kontenerach musimy wcześniej je dodać do pliku /etc/hosts. Ja używam do edycji programu “vim”.
sudo vim /etc/hosts
Następnie dodaj w dowolnej linii nazwy hostów:
127.0.0.1 kafka-1 kafka-2 kafka-3 mongodb
Uruchomienie środowiska
Przejdzmy teraz do katalogu, gdzie wcześniej utworzyliśmy plik “docker-compose.yml” i uruchommy skrypt poniżej. Przy pierwszym uruchomieniu potrwa to trochę dłużej, ponieważ prawdopodobnie nie będziesz miał u siebie lokalnie obrazów, (docker images), które zdefiniowaliśmy. Z tego powodu przy pierwszym uruchomieniu całość bedzie musiała zostać pobrana ze zdalnego repozytorium.
pawel@pawel:~/Desktop/Blog$ docker-compose up -d Creating network "blog_default" with the default driver Creating blog_zookeeper-2_1 ... done Creating blog_mongodb_1 ... done Creating blog_zookeeper-3_1 ... done Creating blog_zookeeper-1_1 ... done Creating blog_kafka-3_1 ... done Creating blog_kafka-2_1 ... done Creating blog_kafka-1_1 ... done
Następnie sprawdźmy status kontenerów uruchamiając komnde “docker-compose ps”. Jak widzimy poniżej status wszystkich jest “Up” – czyli wszystko jest OK.
pawel@pawel:~/Desktop/Blog$ docker-compose ps Name Command State Ports -------------------------------------------------------------------------------------------------------------------- blog_kafka-1_1 /etc/confluent/docker/run Up 0.0.0.0:19092->19092/tcp, 9092/tcp blog_kafka-2_1 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 9092/tcp blog_kafka-3_1 /etc/confluent/docker/run Up 0.0.0.0:39092->39092/tcp, 9092/tcp blog_mongodb_1 docker-entrypoint.sh mongo ... Up 0.0.0.0:27017->27017/tcp blog_zookeeper-1_1 /etc/confluent/docker/run Up 0.0.0.0:12181->12181/tcp, 2181/tcp, 2888/tcp, 3888/tcp blog_zookeeper-2_1 /etc/confluent/docker/run Up 2181/tcp, 0.0.0.0:22181->22181/tcp, 2888/tcp, 3888/tcp blog_zookeeper-3_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
Talend
Skoro już mamy działające środowisko, przejdżmy do budowania job’ów w Talendzie. Na potrzeby tego postaz budujemy dwa joby”
- Zasilający naszą Kafkę sztucznie generowanymi danymi.
- Czytający z Kafki i zapisujący dane do bazy Mongo.
Jak spojrzymy w Paletę to Talend w wersji Open Studio for Big Data udostępnia 5 komponentów związanych z Kafką. My skupimy się na dwóch ostatnich: tKafkaInput i tKafkaOutput.
- tKafkaCommit
- tKafkaConnection
- tKafkaCreateTopic
- tKafkaInput
- tKafkaOutput
Producer – tKafkaOutput
Pierwszym naszym celem będzie stworzenie producenta (ang. producer/sender). Aby zachować pewną losowość wiadomości użyjemy:
- tRowGenerator – do generowania losowych ciągów znaków
- tSleep – żeby zasymulować “spływanie” zdarzeń w czasie – bardziej je “rozciągnąć” w czasie. W przeciwnym razie wygenerowalibyśmy paczkę rekordów i cięzko byłoby nam zaobserwować, że są one również konsumowane w czasie rzeczywistym.
- tMap – w celu przemapowania pól
- tKafkaOutput – zapis do Kafki
Dodatkowo należy zdefiniować:
- Broker list – lista Kafka brokerów oddzielonych przecinkiem wraz z portami(String)
- Topic name – nazwę topic’u na który będziemy wysyłać dane (String)

Consumer – tKafkaInput
W przypadku konsumenta również należy zdefiniować listę borker’ów oraz topic. Dodatkowym parametrem jest podanie “Consumer group id”. Możemy tu wpisać cokolwiek. W przypadku produkcyjnych rozwiązań ten parametr pełni bardzo ważną funkcję, dzięki któremu nasi konsumenci “więdzą”, które wiadomości (offsety) już zostały przez nich odczytane. (Ten temat jest o wiele bardziej obszerniejszy i teraz nie będę opisywał szczegółów).

tMongoDBOutput
Ostatnim elementem naszej układanki jest przekierowanie strumienia danych w stronę kolecji (ang. collection) w MongoDB. W tym celu użyjemy komponentu tMongoDBOutput, gdzie musimy podać:
- server naszej bazy. W docker-compose.yml w sekcji mongodb -> hostname: nadaliśmy nazwe “mongodb” i taką samą zdefiniowaliśmy w plku /etc/hosts, więc podajmy w tym polu nazwę naszego hosta “mogodb”.
- To samo dotyczy się portu. Użyliśmy standardowego wieć zostawiamy 27017.
- Jako bazę ustawiamy zdefiowaną w pliku YAML, czyli “test_db”.

Testy
Nadszedł czas, żeby uruchomić naszą aplikację i sprawdzić jak to działa!
- Na początek uruchom konsumenta – niech już czeka w gotowośći na wiadomości.
- Następnie uruchom producenta i obserwuj jak rekordy są przetwarzane zaraz po tym jak zostały zapisane na Kafkę.

Na koniec sprawdźmy jeszcze czy dane rzeczywiście zostały zapisane do MongoDB. W tym celu zrobimy połączenie ssh do kontenera z bazą Mongo.
# Nazwa mojego kontenera to: blog_mongodb_1 docker exec -it blog_mongodb_1 /bin/bash # Następnie uruchamiamy klienta "mongo" root@mongodb:/# mongo MongoDB shell version v4.0.10 connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb Implicit session: session { "id" : UUID("a68fbc26-daab-4d15-8c26-f2d5ec45ebb5") } MongoDB server version: 4.0.10 Welcome to the MongoDB shell. For interactive help, type "help". For more comprehensive documentation, see http://docs.mongodb.org/ Questions? Try the support group http://groups.google.com/group/mongodb-user Server has startup warnings: 2019-09-30T20:13:39.852+0000 I STORAGE [initandlisten] 2019-09-30T20:13:39.852+0000 I STORAGE [initandlisten] ** WARNING: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine 2019-09-30T20:13:39.852+0000 I STORAGE [initandlisten] ** See http://dochub.mongodb.org/core/prodnotes-filesystem 2019-09-30T20:13:41.191+0000 I CONTROL [initandlisten] 2019-09-30T20:13:41.192+0000 I CONTROL [initandlisten] ** WARNING: Access control is not enabled for the database. 2019-09-30T20:13:41.192+0000 I CONTROL [initandlisten] ** Read and write access to data and configuration is unrestricted. 2019-09-30T20:13:41.192+0000 I CONTROL [initandlisten] --- Enable MongoDB's free cloud-based monitoring service, which will then receive and display metrics about your deployment (disk utilization, CPU, operation statistics, etc). The monitoring data will be available on a MongoDB website with a unique URL accessible to you and anyone you share the URL with. MongoDB may use this information to make product improvements and to suggest MongoDB products and deployment options to you. To enable free monitoring, run the following command: db.enableFreeMonitoring() To permanently disable this reminder, run the following command: db.disableFreeMonitoring() ---
Przechodzimy do bazy “test_db”, sprawdzamy czy nasza kolekcja istnieje i na koniec pobieramy wszystkie dane z kolecji “testEvents“.
> use test_db switched to db test_db > show collections testEvents > db.testEvents.find() { "_id" : ObjectId("5d9276696deec244a5b83d19"), "payload" : "gnOiI7" } { "_id" : ObjectId("5d92766a6deec244a5b83d1a"), "payload" : "rM4Mlc" } { "_id" : ObjectId("5d92766b6deec244a5b83d1b"), "payload" : "U85SZM" } { "_id" : ObjectId("5d92766c6deec244a5b83d1c"), "payload" : "17BkQ4" } { "_id" : ObjectId("5d92766d6deec244a5b83d1d"), "payload" : "IV56wq" } { "_id" : ObjectId("5d92766e6deec244a5b83d1e"), "payload" : "wnkplA" } { "_id" : ObjectId("5d92766f6deec244a5b83d1f"), "payload" : "OUlelH" } { "_id" : ObjectId("5d9276706deec244a5b83d20"), "payload" : "2OuDnN" }
Podsumowanie
Tworzenie przetwarzania strumieniowego z wykorzystaniem Talenda oraz Kafki jak widać nie jest skomplikowane. W dość krótkim czasie jesteśmy wstanie zaimplementować prostą logikę.
Narzędzia ETL typu drag&drop jak Talend generują de facto kod Java, który potem jest uruchamiany jak każda inna aplikacja napisana w języku Java. Osobiście jestem własnego kodu, ponieważ wiele rzeczy można napisać w łatwiejszy i bardziej optymalny sposób. Z drugiej strony narzędzia ETL umożliwiają szybszy dewelopment, łatwiejszą analizę tego co się dzieje w przetwarzaniu i na pewno łatwiej jest znaleźć ludzi lub szybciej ich nauczyć takiego narzędzia niż w przypadku pisania aplikacji od zera.
Kod możesz znaleść na naszym GitHubie.