Talend: Kafka i MongoDB – strumień danych

Talend: Kafka i MongoDB – strumień danych

W dzisiejszym świecie bardzo często spotykamy się z wymaganiami dotyczącymi przetwarzania danych w czasie rzeczywistym. Na rynku istnieje dość sporo narzędzi, które nam to pozwalają osiągnąć. W czołówce możemy wyróżnić: Apache Kafka oraz Apache Flink. Często w tym samym “worku” można spotkać jeszcze Spark Structured Streaming lub Spark Streaming, lecz jest to błąd, ponieważ Spark reprezentuję podejście, które nazywamy “micro-batch” – czyli przetwarzanie danych w małych paczkach.

W tym poście naszym celem będzie stworzenie przetwarzania strumieniowego w czasie rzeczywistym przy wykorzystaniu architektury stworzonej przy użyciu docker-compose. Logikę przetwarzania zaimplementujemy w Talend Open Studio for Big Data (TOSBD).

Przygotowujemy środowisko na Docker’ach

Na początku zajmiemy się stworzeniem środowiska, którę będzie się składać z:

  • 3x Zookeeper
  • 3x Kafka Broker
  • 1x MongoDB

Dzięki Docker-compose jesteśmy wstanie w bardzo szybki sposób przygotować każde środowisko. Ogranicza nas tylko wyobraźnia lub aktualne wymagania 🙂

Poniższy diagram przedstawia naszą architekturę, którą zadeklarujemy w pliku “docker-compose.yml”.

https://better-coding.com/building-apache-kafka-cluster-using-docker-compose-and-virtualbox/

Docker-compose.yml

Zakładam, że masz już zainstalowany docker-compose u siebie na komputerze. Jeśli nie to odsyłam Cie na chwilę do mojego innego posta, gdzie opisałem jak zainstalować Docker-Compose.

Tworzymy plik docker-compose.yml. Postarajmy się zdefiniować powyższy diagram na zapis zrozumiały dla docker-compose.

Zookeeper Cluster

Zdefiniujmy trzy kontenery odpowiadające za 3 niezależne instancje Zookeeper’a.

version: "2"
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-1
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-2:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-2
    ports:
      - "22181:22181"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-3:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-3
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

Kafka Cluster

Następnie dodajmy do istniejącego pliku docker-compose.yml część odpowiedzialną za utowrzenie trzech kontenerów (broker’ów), które razem stworzą nam nasz klaster Kafki. Jak widzimy w poniższym zapisie “depends_on” – borker’y zależą od Zookeeper’a. Dzięki temu zapisowi docker-compose najpierw poczeka, aż wszystkie instancje Zookeep’a będą uruchomione zanim będzie się starał postawić brokery.

  kafka-1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-1
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092

  kafka-2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-2
    ports:
      - "29092:29092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092

  kafka-3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-3
    ports:
      - "39092:39092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092

MongoDB

Na samym końcu zdefiniujmy naszą receptę na bazę MongoDB. W tym przypadku stworzymy jeden kontener, bez router’ów oraz nodów konfiguracyjnych.

  mongodb:
    image: mongo:latest
    hostname: mongodb
    environment:
      - MONGO_DATA_DIR=/data/db
      - MONGO_LOG_DIR=/dev/null
      - MONGODB_USER=test
      - MONGODB_DATABASE=test_db
      - MONGODB_PASS=bigdata_test
    volumes:
      - ./data/db:/data/db
    ports:
      - 27017:27017
    command: mongod --smallfiles --logpath=/dev/null

Edycja /etc/hosts

Aby móc połączyć się po nazwach hostów do naszego środowiska zbudowanego na kontenerach musimy wcześniej je dodać do pliku /etc/hosts. Ja używam do edycji programu “vim”.

sudo vim /etc/hosts

Następnie dodaj w dowolnej linii nazwy hostów:

127.0.0.1 kafka-1 kafka-2 kafka-3 mongodb

Uruchomienie środowiska

Przejdzmy teraz do katalogu, gdzie wcześniej utworzyliśmy plik “docker-compose.yml” i uruchommy skrypt poniżej. Przy pierwszym uruchomieniu potrwa to trochę dłużej, ponieważ prawdopodobnie nie będziesz miał u siebie lokalnie obrazów, (docker images), które zdefiniowaliśmy. Z tego powodu przy pierwszym uruchomieniu całość bedzie musiała zostać pobrana ze zdalnego repozytorium.

pawel@pawel:~/Desktop/Blog$ docker-compose up -d

Creating network "blog_default" with the default driver
Creating blog_zookeeper-2_1 ... done
Creating blog_mongodb_1     ... done
Creating blog_zookeeper-3_1 ... done
Creating blog_zookeeper-1_1 ... done
Creating blog_kafka-3_1     ... done
Creating blog_kafka-2_1     ... done
Creating blog_kafka-1_1     ... done

Następnie sprawdźmy status kontenerów uruchamiając komnde “docker-compose ps”. Jak widzimy poniżej status wszystkich jest “Up” – czyli wszystko jest OK.

pawel@pawel:~/Desktop/Blog$ docker-compose ps
       Name                     Command               State                           Ports                         
--------------------------------------------------------------------------------------------------------------------
blog_kafka-1_1       /etc/confluent/docker/run        Up      0.0.0.0:19092->19092/tcp, 9092/tcp                    
blog_kafka-2_1       /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 9092/tcp                    
blog_kafka-3_1       /etc/confluent/docker/run        Up      0.0.0.0:39092->39092/tcp, 9092/tcp                    
blog_mongodb_1       docker-entrypoint.sh mongo ...   Up      0.0.0.0:27017->27017/tcp                              
blog_zookeeper-1_1   /etc/confluent/docker/run        Up      0.0.0.0:12181->12181/tcp, 2181/tcp, 2888/tcp, 3888/tcp
blog_zookeeper-2_1   /etc/confluent/docker/run        Up      2181/tcp, 0.0.0.0:22181->22181/tcp, 2888/tcp, 3888/tcp
blog_zookeeper-3_1   /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp

Talend

Skoro już mamy działające środowisko, przejdżmy do budowania job’ów w Talendzie. Na potrzeby tego postaz budujemy dwa joby”

  1. Zasilający naszą Kafkę sztucznie generowanymi danymi.
  2. Czytający z Kafki i zapisujący dane do bazy Mongo.

Jak spojrzymy w Paletę to Talend w wersji Open Studio for Big Data udostępnia 5 komponentów związanych z Kafką. My skupimy się na dwóch ostatnich: tKafkaInput i tKafkaOutput.

  1. tKafkaCommit
  2. tKafkaConnection
  3. tKafkaCreateTopic
  4. tKafkaInput
  5. tKafkaOutput

Producer – tKafkaOutput

Pierwszym naszym celem będzie stworzenie producenta (ang. producer/sender). Aby zachować pewną losowość wiadomości użyjemy:

  • tRowGenerator – do generowania losowych ciągów znaków
  • tSleep – żeby zasymulować “spływanie” zdarzeń w czasie – bardziej je “rozciągnąć” w czasie. W przeciwnym razie wygenerowalibyśmy paczkę rekordów i cięzko byłoby nam zaobserwować, że są one również konsumowane w czasie rzeczywistym.
  • tMap – w celu przemapowania pól
  • tKafkaOutput – zapis do Kafki

Dodatkowo należy zdefiniować:

  • Broker list – lista Kafka brokerów oddzielonych przecinkiem wraz z portami(String)
  • Topic name – nazwę topic’u na który będziemy wysyłać dane (String)

Consumer – tKafkaInput

W przypadku konsumenta również należy zdefiniować listę borker’ów oraz topic. Dodatkowym parametrem jest podanie “Consumer group id”. Możemy tu wpisać cokolwiek. W przypadku produkcyjnych rozwiązań ten parametr pełni bardzo ważną funkcję, dzięki któremu nasi konsumenci “więdzą”, które wiadomości (offsety) już zostały przez nich odczytane. (Ten temat jest o wiele bardziej obszerniejszy i teraz nie będę opisywał szczegółów).

tMongoDBOutput

Ostatnim elementem naszej układanki jest przekierowanie strumienia danych w stronę kolecji (ang. collection) w MongoDB. W tym celu użyjemy komponentu tMongoDBOutput, gdzie musimy podać:

  • server naszej bazy. W docker-compose.yml w sekcji mongodb -> hostname: nadaliśmy nazwe “mongodb” i taką samą zdefiniowaliśmy w plku /etc/hosts, więc podajmy w tym polu nazwę naszego hosta “mogodb”.
  • To samo dotyczy się portu. Użyliśmy standardowego wieć zostawiamy 27017.
  • Jako bazę ustawiamy zdefiowaną w pliku YAML, czyli “test_db”.

Testy

Nadszedł czas, żeby uruchomić naszą aplikację i sprawdzić jak to działa!

  1. Na początek uruchom konsumenta – niech już czeka w gotowośći na wiadomości.
  2. Następnie uruchom producenta i obserwuj jak rekordy są przetwarzane zaraz po tym jak zostały zapisane na Kafkę.

Na koniec sprawdźmy jeszcze czy dane rzeczywiście zostały zapisane do MongoDB. W tym celu zrobimy połączenie ssh do kontenera z bazą Mongo.

# Nazwa mojego kontenera to: blog_mongodb_1
docker exec -it blog_mongodb_1 /bin/bash

# Następnie uruchamiamy klienta "mongo"
root@mongodb:/# mongo

MongoDB shell version v4.0.10
connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("a68fbc26-daab-4d15-8c26-f2d5ec45ebb5") }
MongoDB server version: 4.0.10
Welcome to the MongoDB shell.
For interactive help, type "help".
For more comprehensive documentation, see
	http://docs.mongodb.org/
Questions? Try the support group
	http://groups.google.com/group/mongodb-user
Server has startup warnings: 
2019-09-30T20:13:39.852+0000 I STORAGE  [initandlisten] 
2019-09-30T20:13:39.852+0000 I STORAGE  [initandlisten] ** WARNING: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine
2019-09-30T20:13:39.852+0000 I STORAGE  [initandlisten] **          See http://dochub.mongodb.org/core/prodnotes-filesystem
2019-09-30T20:13:41.191+0000 I CONTROL  [initandlisten] 
2019-09-30T20:13:41.192+0000 I CONTROL  [initandlisten] ** WARNING: Access control is not enabled for the database.
2019-09-30T20:13:41.192+0000 I CONTROL  [initandlisten] **          Read and write access to data and configuration is unrestricted.
2019-09-30T20:13:41.192+0000 I CONTROL  [initandlisten] 
---
Enable MongoDB's free cloud-based monitoring service, which will then receive and display
metrics about your deployment (disk utilization, CPU, operation statistics, etc).

The monitoring data will be available on a MongoDB website with a unique URL accessible to you
and anyone you share the URL with. MongoDB may use this information to make product
improvements and to suggest MongoDB products and deployment options to you.

To enable free monitoring, run the following command: db.enableFreeMonitoring()
To permanently disable this reminder, run the following command: db.disableFreeMonitoring()
---

Przechodzimy do bazy “test_db”, sprawdzamy czy nasza kolekcja istnieje i na koniec pobieramy wszystkie dane z kolecji “testEvents“.

> use test_db
switched to db test_db
> show collections
testEvents
> db.testEvents.find()
{ "_id" : ObjectId("5d9276696deec244a5b83d19"), "payload" : "gnOiI7" }
{ "_id" : ObjectId("5d92766a6deec244a5b83d1a"), "payload" : "rM4Mlc" }
{ "_id" : ObjectId("5d92766b6deec244a5b83d1b"), "payload" : "U85SZM" }
{ "_id" : ObjectId("5d92766c6deec244a5b83d1c"), "payload" : "17BkQ4" }
{ "_id" : ObjectId("5d92766d6deec244a5b83d1d"), "payload" : "IV56wq" }
{ "_id" : ObjectId("5d92766e6deec244a5b83d1e"), "payload" : "wnkplA" }
{ "_id" : ObjectId("5d92766f6deec244a5b83d1f"), "payload" : "OUlelH" }
{ "_id" : ObjectId("5d9276706deec244a5b83d20"), "payload" : "2OuDnN" }

Podsumowanie

Tworzenie przetwarzania strumieniowego z wykorzystaniem Talenda oraz Kafki jak widać nie jest skomplikowane. W dość krótkim czasie jesteśmy wstanie zaimplementować prostą logikę.

Narzędzia ETL typu drag&drop jak Talend generują de facto kod Java, który potem jest uruchamiany jak każda inna aplikacja napisana w języku Java. Osobiście jestem własnego kodu, ponieważ wiele rzeczy można napisać w łatwiejszy i bardziej optymalny sposób. Z drugiej strony narzędzia ETL umożliwiają szybszy dewelopment, łatwiejszą analizę tego co się dzieje w przetwarzaniu i na pewno łatwiej jest znaleźć ludzi lub szybciej ich nauczyć takiego narzędzia niż w przypadku pisania aplikacji od zera.

Kod możesz znaleść na naszym GitHubie.

Jeśli spodobał Ci się ten post to zostaw proszę komentarz poniżej oraz udostępnij ten post na swoim Facebook’u, Twitter’ze, LinkedIn lub innej stronie z mediami społecznościowymi.
Dzięki!

Leave a Reply

avatar
  Subscribe  
Powiadom o
Close Menu