Talend Kafka MongoDB Docker-Compose – Strumień Danych

You are currently viewing Talend Kafka MongoDB Docker-Compose – Strumień Danych
4.8
(204)

W tym poście naszym celem będzie stworzenie przetwarzania strumieniowego składającego się z: Talend Kafka MongoDB Docker-Compose – w czasie rzeczywistym przy wykorzystaniu architektury stworzonej przy użyciu docker-compose. Logikę przetwarzania zaimplementujemy w Talend Open Studio for Big Data (TOSBD).

Wstęp

W dzisiejszym świecie bardzo często spotykamy się z wymaganiami dotyczącymi przetwarzania danych w czasie rzeczywistym. Na rynku istnieje dość sporo narzędzi, które nam to pozwalają osiągnąć.

W czołówce możemy wyróżnić: Apache Flink oraz Apache Kafka. Często w tym samym „worku” można spotkać jeszcze Spark Structured Streaming lub Spark Streaming, lecz jest to błąd, ponieważ Spark reprezentuję podejście, które nazywamy „micro-batch” – czyli przetwarzanie danych w małych paczkach.

Komponenty

Na początku zajmiemy się stworzeniem środowiska, które będzie się składać z: 3x Zookeeper, 3x Kafka Broker, 1x MongoDB.

Dzięki Docker-compose jesteśmy wstanie w bardzo szybki sposób przygotować każde środowisko. Ogranicza nas tylko wyobraźnia lub aktualne wymagania. Poniższy diagram przedstawia naszą architekturę, którą zadeklarujemy w pliku YML.

Talend: Kafka i MongoDB - strumień danych
Talend Kafka MongoDB

Plik YML

Zakładam, że masz już zainstalowany docker-compose u siebie na komputerze. Jeśli nie to odsyłam Cie na chwilę do mojego innego posta, gdzie opisałem jak zainstalować Docker-Compose. Tworzymy plik docker-compose.yml. Postarajmy się zdefiniować powyższy diagram na zapis zrozumiały dla docker-compose.

Zookeeper Cluster

Zdefiniujmy trzy kontenery odpowiadające za 3 niezależne instancje Zookeeper’a.

version: "2"
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-1
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-2:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-2
    ports:
      - "22181:22181"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-3:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-3
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

Kafka Cluster

Następnie dodajmy do istniejącego pliku docker-compose.yml część odpowiedzialną za utworzenie trzech kontenerów (broker), które razem stworzą nam nasz klaster Kafki. Jak widzimy w poniższym zapisie „depends_on” – broker zależą od Zookeeper.

Dzięki temu zapisowi docker-compose najpierw poczeka, aż wszystkie instancje Zookeeper będą uruchomione zanim będzie się starał postawić brokery.

  kafka-1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-1
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092

  kafka-2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-2
    ports:
      - "29092:29092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092

  kafka-3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-3
    ports:
      - "39092:39092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092

MongoDB

Na samym końcu zdefiniujmy naszą receptę na bazę MongoDB. W tym przypadku stworzymy jeden kontener, bez router’ów oraz nodów konfiguracyjnych.

  mongodb:
    image: mongo:latest
    hostname: mongodb
    environment:
      - MONGO_DATA_DIR=/data/db
      - MONGO_LOG_DIR=/dev/null
      - MONGODB_USER=test
      - MONGODB_DATABASE=test_db
      - MONGODB_PASS=bigdata_test
    volumes:
      - ./data/db:/data/db
    ports:
      - 27017:27017
    command: mongod --smallfiles --logpath=/dev/null

Edycja etc hosts

Aby móc połączyć się po nazwach hostów do naszego środowiska zbudowanego na kontenerach musimy wcześniej je dodać do pliku /etc/hosts. Ja używam do edycji programu „vim”.

sudo vim /etc/hosts

Następnie dodaj w dowolnej linii nazwy hostów:

127.0.0.1 kafka-1 kafka-2 kafka-3 mongodb

Uruchomienie Środowiska

Przejdźmy teraz do katalogu, gdzie wcześniej utworzyliśmy plik „docker-compose.yml” i uruchommy skrypt poniżej. Przy pierwszym uruchomieniu potrwa to trochę dłużej, ponieważ prawdopodobnie nie będziesz miał u siebie lokalnie obrazów, (docker images), które zdefiniowaliśmy.

Z tego powodu przy pierwszym uruchomieniu całość będzie musiała zostać pobrana ze zdalnego repozytorium.

pawel@pawel:~/Desktop/Blog$ docker-compose up -d

Creating network "blog_default" with the default driver
Creating blog_zookeeper-2_1 ... done
Creating blog_mongodb_1     ... done
Creating blog_zookeeper-3_1 ... done
Creating blog_zookeeper-1_1 ... done
Creating blog_kafka-3_1     ... done
Creating blog_kafka-2_1     ... done
Creating blog_kafka-1_1     ... done

Następnie sprawdźmy status kontenerów uruchamiając komendę „docker-compose ps„. Jak widzimy poniżej status wszystkich jest „Up” – czyli wszystko jest OK.

pawel@pawel:~/Desktop/Blog$ docker-compose ps
       Name                     Command               State                           Ports                         
--------------------------------------------------------------------------------------------------------------------
blog_kafka-1_1       /etc/confluent/docker/run        Up      0.0.0.0:19092->19092/tcp, 9092/tcp                    
blog_kafka-2_1       /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 9092/tcp                    
blog_kafka-3_1       /etc/confluent/docker/run        Up      0.0.0.0:39092->39092/tcp, 9092/tcp                    
blog_mongodb_1       docker-entrypoint.sh mongo ...   Up      0.0.0.0:27017->27017/tcp                              
blog_zookeeper-1_1   /etc/confluent/docker/run        Up      0.0.0.0:12181->12181/tcp, 2181/tcp, 2888/tcp, 3888/tcp
blog_zookeeper-2_1   /etc/confluent/docker/run        Up      2181/tcp, 0.0.0.0:22181->22181/tcp, 2888/tcp, 3888/tcp
blog_zookeeper-3_1   /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp

Talend

Skoro już mamy działające środowisko, przejdźmy do budowania job’ów w Talend. Na potrzeby tego posta budujemy dwa joby”

  • Zasilający naszą Kafkę sztucznie generowanymi danymi.
  • Czytający z Kafki i zapisujący dane do bazy Mongo.

Jak spojrzymy w Paletę to Talend w wersji Open Studio for Big Data udostępnia 5 komponentów związanych z Kafką (tKafkaCommit, tKafkaConnection, tKafkaCreateTopic, tKafkaInput, tKafkaOutput). My skupimy się na dwóch ostatnich.

Producer – tKafkaOutput

Pierwszym naszym celem będzie stworzenie producenta (ang. producer/sender). Aby zachować pewną losowość wiadomości użyjemy:

tRowGenerator – do generowania losowych ciągów znaków

tSleep – żeby zasymulować „spływanie” zdarzeń w czasie – bardziej je „rozciągnąć” w czasie. W przeciwnym razie wygenerowalibyśmy paczkę rekordów i cieżo byłoby nam zaobserwować, że są one również konsumowane w czasie rzeczywistym.

tMap – w celu przemapowania pól

tKafkaOutput – zapis do Kafki

Dodatkowo należy zdefiniować:

Broker list – lista Kafka brokerów oddzielonych przecinkiem wraz z portami(String)

Topic name – nazwę topic’u na który będziemy wysyłać dane (String)

BigData-ETL: tKafkaOutput
Talend Kafka MongoDB Docker-Compose

Consumer – tKafkaInput

W przypadku konsumenta również należy zdefiniować listę borker’ów oraz topic. Dodatkowym parametrem jest podanie Consumer group id. Możemy tu wpisać cokolwiek.

W przypadku produkcyjnych rozwiązań ten parametr pełni bardzo ważną funkcję, dzięki któremu nasi konsumenci „wiedzą”, które wiadomości (offsety) już zostały przez nich odczytane. (Ten temat jest o wiele bardziej obszerniejszy i teraz nie będę opisywał szczegółów).

Talend: Kafka i MongoDB - strumień danych
Talend Kafka MongoDB Docker-Compose

tMongoDBOutput

Ostatnim elementem naszej układanki jest przekierowanie strumienia danych w stronę kolecji (ang. collection) w MongoDB. W tym celu użyjemy komponentu tMongoDBOutput, gdzie musimy podać:

  • server naszej bazy. W docker-compose.yml w sekcji mongodb – hostname: nadaliśmy nazwe „mongodb” i taką samą zdefiniowaliśmy w plku /etc/hosts, więc podajmy w tym polu nazwę naszego hosta „mogodb”.
  • To samo dotyczy się portu. Użyliśmy standardowego wieć zostawiamy 27017.
  • Jako bazę ustawiamy zdefiowaną w pliku YAML, czyli „test_db”.
Talend: Kafka i MongoDB - strumień danych
Talend Kafka MongoDB Docker-Compose

Testy

Nadszedł czas, żeby uruchomić naszą aplikację i sprawdzić jak to działa!

Na początek uruchom konsumenta – niech już czeka w gotowości na wiadomości.

Następnie uruchom producenta i obserwuj jak rekordy są przetwarzane zaraz po tym jak zostały zapisane na Kafkę.

Talend: Kafka i MongoDB - strumień danych

Na koniec sprawdźmy jeszcze czy dane rzeczywiście zostały zapisane do MongoDB. W tym celu zrobimy połączenie ssh do kontenera z bazą Mongo.

# Nazwa mojego kontenera to: blog_mongodb_1
docker exec -it blog_mongodb_1 /bin/bash

# Następnie uruchamiamy klienta "mongo"
root@mongodb:/# mongo

MongoDB shell version v4.0.10
connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("a68fbc26-daab-4d15-8c26-f2d5ec45ebb5") }
MongoDB server version: 4.0.10
Welcome to the MongoDB shell.
For interactive help, type "help".
For more comprehensive documentation, see
	http://docs.mongodb.org/
Questions? Try the support group
	http://groups.google.com/group/mongodb-user
Server has startup warnings: 
2019-09-30T20:13:39.852+0000 I STORAGE  [initandlisten] 
2019-09-30T20:13:39.852+0000 I STORAGE  [initandlisten] ** WARNING: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine
2019-09-30T20:13:39.852+0000 I STORAGE  [initandlisten] **          See http://dochub.mongodb.org/core/prodnotes-filesystem
2019-09-30T20:13:41.191+0000 I CONTROL  [initandlisten] 
2019-09-30T20:13:41.192+0000 I CONTROL  [initandlisten] ** WARNING: Access control is not enabled for the database.
2019-09-30T20:13:41.192+0000 I CONTROL  [initandlisten] **          Read and write access to data and configuration is unrestricted.
2019-09-30T20:13:41.192+0000 I CONTROL  [initandlisten] 
---
Enable MongoDB's free cloud-based monitoring service, which will then receive and display
metrics about your deployment (disk utilization, CPU, operation statistics, etc).

The monitoring data will be available on a MongoDB website with a unique URL accessible to you
and anyone you share the URL with. MongoDB may use this information to make product
improvements and to suggest MongoDB products and deployment options to you.

To enable free monitoring, run the following command: db.enableFreeMonitoring()
To permanently disable this reminder, run the following command: db.disableFreeMonitoring()
---

Przechodzimy do bazy „test_db”, sprawdzamy czy nasza kolekcja istnieje i na koniec pobieramy wszystkie dane z kolecji „testEvents„.

> use test_db
switched to db test_db
> show collections
testEvents
> db.testEvents.find()
{ "_id" : ObjectId("5d9276696deec244a5b83d19"), "payload" : "gnOiI7" }
{ "_id" : ObjectId("5d92766a6deec244a5b83d1a"), "payload" : "rM4Mlc" }
{ "_id" : ObjectId("5d92766b6deec244a5b83d1b"), "payload" : "U85SZM" }
{ "_id" : ObjectId("5d92766c6deec244a5b83d1c"), "payload" : "17BkQ4" }
{ "_id" : ObjectId("5d92766d6deec244a5b83d1d"), "payload" : "IV56wq" }
{ "_id" : ObjectId("5d92766e6deec244a5b83d1e"), "payload" : "wnkplA" }
{ "_id" : ObjectId("5d92766f6deec244a5b83d1f"), "payload" : "OUlelH" }
{ "_id" : ObjectId("5d9276706deec244a5b83d20"), "payload" : "2OuDnN" }

Podsumowanie

Tworzenie przetwarzania strumieniowego z wykorzystaniem Talend oraz Kafki jak widać nie jest skomplikowane. W dość krótkim czasie jesteśmy wstanie zaimplementować prostą logikę.

Narzędzia ETL typu drag&drop jak Talend generują de facto kod Java, który potem jest uruchamiany jak każda inna aplikacja napisana w języku Java. Osobiście jestem własnego kodu, ponieważ wiele rzeczy można napisać w łatwiejszy i bardziej optymalny sposób. Kod możesz znaleść na naszym GitHubie.

Z drugiej strony narzędzia ETL umożliwiają szybszy dewelopment, łatwiejszą analizę tego co się dzieje w przetwarzaniu i na pewno łatwiej jest znaleźć ludzi lub szybciej ich nauczyć takiego narzędzia niż w przypadku pisania aplikacji od zera.

How useful was this post?

Click on a star to rate it!

Average rating 4.8 / 5. Vote count: 204

No votes so far! Be the first to rate this post.

Subscribe
Powiadom o
guest
0 Comments
Inline Feedbacks
View all comments