Talend: Kafka and MongoDB – real-time streaming

Talend: Kafka and MongoDB – real-time streaming

In today’s world, we often meet requirements for real-time data processing. There are quite a few tools on the market that allow us to achieve this. At the forefront we can distinguish: Apache Kafka and Apache Flink. Often in the same “bag” you can still meet Spark Structured Streaming or Spark Streaming, but this is a mistake, because Spark represents an approach that we call “micro-batch” – that is, processing data in small packages.

In this post, our goal will be to create real-time streaming processing using architecture created using docker-compose. We implement the processing logic in Talend Open Studio for Big Data (TOSBD).

Prepare the environment at Dockers

At the beginning, we create an environment that will consist of:

  • 3x Zookeeper
  • 3x Kafka Broker
  • 1x MongoDB

Using the Docker-compose, we are able to quickly prepare any environment. We are limited only by imagination or current requirements 🙂

The diagram below presents our architecture, which we will declare in the file “docker-compose.yml”.

https://better-coding.com/building-apache-kafka-cluster-using-docker-compose-and-virtualbox/

Docker-compose.yml

I assume you already have docker-compose installed on your computer. If not, I refer you for a moment to post where I describe it how to install Docker-Compose.

We create the file docker-compose.yml. Let’s try to define the above diagram for a understandable language for docker-compose.

Zookeeper Cluster

We define three containers responsible for three independent Zookeeper instances.

version: "2"
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-1
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-2:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-2
    ports:
      - "22181:22181"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-3:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-3
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

Kafka Cluster

Then add to the existing docker-compose.yml file the part responsible for creating three containers (brokers) that together will create our Kafka cluster. As we can see in the following “depends_on” entry – borkers depend on Zookeeper. Due to this fact, docker-compose will first wait for all Zookeep instances to be started before attempting to place brokers.

  kafka-1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-1
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092

  kafka-2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-2
    ports:
      - "29092:29092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092

  kafka-3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-3
    ports:
      - "39092:39092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092

MongoDB

Finally, let’s define our prescription for the MongoDB database. In this case, we will create one container, without routers and configuration nodes.

  mongodb:
    image: mongo:latest
    hostname: mongodb
    environment:
      - MONGO_DATA_DIR=/data/db
      - MONGO_LOG_DIR=/dev/null
      - MONGODB_USER=test
      - MONGODB_DATABASE=test_db
      - MONGODB_PASS=bigdata_test
    volumes:
      - ./data/db:/data/db
    ports:
      - 27017:27017
    command: mongod --smallfiles --logpath=/dev/null

Edit /etc/hosts file

To be able to connect by host names to our container built environment, we must first add them to the /etc/hosts file. I use “vim” to edit.

sudo vim /etc/hosts

Then add the host names on any new line:

127.0.0.1 kafka-1 kafka-2 kafka-3 mongodb

Starting the environment

Let’s go to the directory, where we previously created the “docker-compose.yml” file and run the script below. The first time you start it will take a little longer because you probably won’t have the docker images we have defined locally. For this reason, the first time you start it will have to be downloaded from the remote repository.

pawel@pawel:~/Desktop/Blog$ docker-compose up -d

Creating network "blog_default" with the default driver
Creating blog_zookeeper-2_1 ... done
Creating blog_mongodb_1     ... done
Creating blog_zookeeper-3_1 ... done
Creating blog_zookeeper-1_1 ... done
Creating blog_kafka-3_1     ... done
Creating blog_kafka-2_1     ... done
Creating blog_kafka-1_1     ... done

Next, check the status of the containers by running the “docker-compose ps” command. As we can see below, the status of everyone is “Up” – meaning everything is OK.

pawel@pawel:~/Desktop/Blog$ docker-compose ps
       Name                     Command               State                           Ports                         
--------------------------------------------------------------------------------------------------------------------
blog_kafka-1_1       /etc/confluent/docker/run        Up      0.0.0.0:19092->19092/tcp, 9092/tcp                    
blog_kafka-2_1       /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 9092/tcp                    
blog_kafka-3_1       /etc/confluent/docker/run        Up      0.0.0.0:39092->39092/tcp, 9092/tcp                    
blog_mongodb_1       docker-entrypoint.sh mongo ...   Up      0.0.0.0:27017->27017/tcp                              
blog_zookeeper-1_1   /etc/confluent/docker/run        Up      0.0.0.0:12181->12181/tcp, 2181/tcp, 2888/tcp, 3888/tcp
blog_zookeeper-2_1   /etc/confluent/docker/run        Up      2181/tcp, 0.0.0.0:22181->22181/tcp, 2888/tcp, 3888/tcp
blog_zookeeper-3_1   /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp

Talend

Now that we have a working environment, let’s move on to building jobs in Talend. We will build two jobs for this purpose “

  1. Supplying our Kafka with artificially generated data.
  2. Reading from Kafka and writing data to the Mongo database.

As we look at the Palette, Talend in the Open Studio for Big Data version provides 5 components related to Kafka. We will focus on the last two: tKafkaInput and tKafkaOutput.

  1. tKafkaCommit
  2. tKafkaConnection
  3. tKafkaCreateTopic
  4. tKafkaInput
  5. tKafkaOutput

Producer – tKafkaOutput

Our first goal will be to create a producer (sender). To keep the message random, we will use:

  • tRowGenerator – to generate random strings
  • tSleep – to simulate the “flow” of events in time – more “stretch” them in time. Otherwise, we would generate a packet of records and it would be difficult for us to observe that they are also consumed in real time.
  • tMap – to remap the fields
  • tKafkaOutput – entry to Kafka

In addition, you must define:

  • Broker list – list of Kafka brokers with ports separated by commas (String)
  • Topic name – name of the topic to which we will send data (String)

Consumer – tKafkaInput

In the case of the consumer, a brokers list and topic should also be defined. An additional parameter is the provision of “Consumer group id”. We can enter anything here. In the case of production solutions, this parameter has a very important function, our consumers “know” which messages (offsets) have already been read by them. (This topic is much more extensive and I will not describe the details now).

tMongoDBOutput

The last element of our puzzle is redirecting the data stream towards the collection in MongoDB. To this end, we will use the tMongoDBOutput component, where we must provide:

  • our database server. In docker-compose.yml in the section mongodb -> hostname: we gave the name “mongodb” and defined the same in / etc / hosts, so we give our host name “mogodb” in this field.
  • The same applies to the port. We used the standard so we leave 27017.
  • As the base we set defined in the YAML file- “test_db“.

Run and test the application

It’s time to run our application and check how it works!

  1. To start, start the consumer – let them wait in readiness for messages.
  2. Then start the producer and watch how the records are processed right after they were saved to Kafka.

Finally, let’s check whether the data has actually been saved to MongoDB. For this purpose, we will make a ssh connection to the container with the Mongo base.

# The name of my container is blog_mongodb_1
docker exec -it blog_mongodb_1 /bin/bash

# Then start the "mongo" client
root@mongodb:/# mongo

MongoDB shell version v4.0.10
connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("a68fbc26-daab-4d15-8c26-f2d5ec45ebb5") }
MongoDB server version: 4.0.10
Welcome to the MongoDB shell.
For interactive help, type "help".
For more comprehensive documentation, see
	http://docs.mongodb.org/
Questions? Try the support group
	http://groups.google.com/group/mongodb-user
Server has startup warnings: 
2019-09-30T20:13:39.852+0000 I STORAGE  [initandlisten] 
2019-09-30T20:13:39.852+0000 I STORAGE  [initandlisten] ** WARNING: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine
2019-09-30T20:13:39.852+0000 I STORAGE  [initandlisten] **          See http://dochub.mongodb.org/core/prodnotes-filesystem
2019-09-30T20:13:41.191+0000 I CONTROL  [initandlisten] 
2019-09-30T20:13:41.192+0000 I CONTROL  [initandlisten] ** WARNING: Access control is not enabled for the database.
2019-09-30T20:13:41.192+0000 I CONTROL  [initandlisten] **          Read and write access to data and configuration is unrestricted.
2019-09-30T20:13:41.192+0000 I CONTROL  [initandlisten] 
---
Enable MongoDB's free cloud-based monitoring service, which will then receive and display
metrics about your deployment (disk utilization, CPU, operation statistics, etc).

The monitoring data will be available on a MongoDB website with a unique URL accessible to you
and anyone you share the URL with. MongoDB may use this information to make product
improvements and to suggest MongoDB products and deployment options to you.

To enable free monitoring, run the following command: db.enableFreeMonitoring()
To permanently disable this reminder, run the following command: db.disableFreeMonitoring()
---

We go to the “test_db” database, check whether our collection exists and finally download all data from the “testEvents” collection.

> use test_db
switched to db test_db
> show collections
testEvents
> db.testEvents.find()
{ "_id" : ObjectId("5d9276696deec244a5b83d19"), "payload" : "gnOiI7" }
{ "_id" : ObjectId("5d92766a6deec244a5b83d1a"), "payload" : "rM4Mlc" }
{ "_id" : ObjectId("5d92766b6deec244a5b83d1b"), "payload" : "U85SZM" }
{ "_id" : ObjectId("5d92766c6deec244a5b83d1c"), "payload" : "17BkQ4" }
{ "_id" : ObjectId("5d92766d6deec244a5b83d1d"), "payload" : "IV56wq" }
{ "_id" : ObjectId("5d92766e6deec244a5b83d1e"), "payload" : "wnkplA" }
{ "_id" : ObjectId("5d92766f6deec244a5b83d1f"), "payload" : "OUlelH" }
{ "_id" : ObjectId("5d9276706deec244a5b83d20"), "payload" : "2OuDnN" }

Summary

Creating stream processing using Talend and Kafka as you can see is not complicated. In a fairly short time, we are able to implement simple logic.

Drag & drop ETL tools like Talend generate Java de facto code, which is then run like any other Java application. Personally, I more like the custom code because many things can be written in an easier and more optimal way. On the other hand, ETL tools enable faster development, easier analysis of what is happening in processing and it is certainly easier to find people or teach them such a tool faster than when writing applications from scratch.

You can find the code on GitHub.

If you enjoyed this post please add the comment below or share this post on your Facebook, Twitter, LinkedIn or another social media webpage.
Thanks in advanced!

Leave a Reply

avatar
  Subscribe  
Notify of
Close Menu