In today’s world, we often meet requirements for real-time data processing (Talend Kafka MongoDB Docker-Compose" real-time). There are quite a few tools on the market that allow us to achieve this. At the forefront we can distinguish: Apache Kafka" and Apache Flink. Often in the same “bag” you can still meet Spark Structured Streaming or Spark" Streaming, but this is a mistake, because Spark represents an approach that we call “micro-batch” – that is, processing data in small packages.
Table of Contents
Introduction
In this post, our goal will be to create real-time streaming processing using architecture created using docker-compose". We implement the processing logic in Talend" Open" Studio for Big Data" (TOSBD).
Talend Kafka MongoDB Docker-Compose Real-Time
At the beginning, we create an environment that will consist of:
- 3x Zookeeper
- 3x Kafka Broker
- 1x MongoDB"
Using the Docker-Compose, we are able to quickly prepare any environment. We are limited only by imagination or current requirements 🙂
The diagram below presents our architecture, which we will declare in the file “docker-compose.yml”. (Talend Kafka MongoDB Docker-Compose" real-time)

YML File
I assume you already have docker-compose" installed on your computer. If not, I refer you for a moment to post where I describe it how to install docker-compose".
We create the file docker-compose".yml. Let’s try to define the above diagram for a understandable language for docker-compose".
Zookeeper Cluster
We define three containers responsible for three independent Zookeeper instances.
version: "2" services: zookeeper-1: image: confluentinc/cp-zookeeper:latest hostname: zookeeper-1 ports: - "12181:12181" environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 12181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888 zookeeper-2: image: confluentinc/cp-zookeeper:latest hostname: zookeeper-2 ports: - "22181:22181" environment: ZOOKEEPER_SERVER_ID: 2 ZOOKEEPER_CLIENT_PORT: 22181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888 zookeeper-3: image: confluentinc/cp-zookeeper:latest hostname: zookeeper-3 ports: - "32181:32181" environment: ZOOKEEPER_SERVER_ID: 3 ZOOKEEPER_CLIENT_PORT: 32181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888
Kafka Cluster
Then add to the existing docker-compose".yml file the part responsible for creating three containers (brokers) that together will create our Kafka" cluster. As we can see in the following “depends_on” entry – borkers depend on Zookeeper. Due to this fact, docker-compose" will first wait for all Zookeep instances to be started before attempting to place brokers.
kafka-1: image: confluentinc/cp-kafka:latest hostname: kafka-1 ports: - "19092:19092" depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092 kafka-2: image: confluentinc/cp-kafka:latest hostname: kafka-2 ports: - "29092:29092" depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092 kafka-3: image: confluentinc/cp-kafka:latest hostname: kafka-3 ports: - "39092:39092" depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092
MongoDB
Finally, let’s define our prescription for the MongoDB database". In this case, we will create one container, without routers and configuration nodes.
mongodb: image: mongo:latest hostname: mongodb environment: - MONGO_DATA_DIR=/data/db - MONGO_LOG_DIR=/dev/null - MONGODB_USER=test - MONGODB_DATABASE=test_db - MONGODB_PASS=bigdata_test volumes: - ./data/db:/data/db ports: - 27017:27017 command: mongod --smallfiles --logpath=/dev/null
Edit /etc/hosts file
To be able to connect by host names to our container built environment, we must first add them to the /etc/hosts file. I use “vim” to edit.
sudo vim /etc/hosts
Then add the host names on any new line:
127.0.0.1 kafka-1 kafka-2 kafka-3 mongodb
Starting the environment
Let’s go to the directory, where we previously created the “docker-compose.yml” file and run the script below. The first time you start it will take a little longer because you probably won’t have the docker" images we have defined locally. For this reason, the first time you start it will have to be downloaded from the remote repository. (Talend Kafka MongoDB Docker-Compose" real-time)
pawel@pawel:~/Desktop/Blog$ docker-compose up -d Creating network "blog_default" with the default driver Creating blog_zookeeper-2_1 ... done Creating blog_mongodb_1 ... done Creating blog_zookeeper-3_1 ... done Creating blog_zookeeper-1_1 ... done Creating blog_kafka-3_1 ... done Creating blog_kafka-2_1 ... done Creating blog_kafka-1_1 ... done (Talend Kafka MongoDB Docker-Compose real-time)
Next, check the status of the containers by running the “docker-compose ps” command. As we can see below, the status of everyone is “Up” – meaning everything is OK.
pawel@pawel:~/Desktop/Blog$ docker-compose ps Name Command State Ports -------------------------------------------------------------------------------------------------------------------- blog_kafka-1_1 /etc/confluent/docker/run Up 0.0.0.0:19092->19092/tcp, 9092/tcp blog_kafka-2_1 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 9092/tcp blog_kafka-3_1 /etc/confluent/docker/run Up 0.0.0.0:39092->39092/tcp, 9092/tcp blog_mongodb_1 docker-entrypoint.sh mongo ... Up 0.0.0.0:27017->27017/tcp blog_zookeeper-1_1 /etc/confluent/docker/run Up 0.0.0.0:12181->12181/tcp, 2181/tcp, 2888/tcp, 3888/tcp blog_zookeeper-2_1 /etc/confluent/docker/run Up 2181/tcp, 0.0.0.0:22181->22181/tcp, 2888/tcp, 3888/tcp blog_zookeeper-3_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp (Talend Kafka MongoDB Docker-Compose real-time)
Kafka Talend
Now that we have a working environment, let’s move on to building jobs in Talend". We will build two jobs for this purpose “
- Supplying our Kafka" with artificially generated data.
- Reading from Kafka and writing data to the Mongo database".
As we look at the Palette, Talend" in the Open" Studio for Big Data" version provides 5 components related to Kafka. We will focus on the last two: tKafkaInput and tKafkaOutput. (Talend Kafka MongoDB Docker-Compose" real-time)
- tKafkaCommit
- tKafkaConnection
- tKafkaCreateTopic
- tKafkaInput
- tKafkaOutput
Producer – tKafkaOutput
Our first goal will be to create a producer (sender). To keep the message random, we will use:
- tRowGenerator – to generate random strings
- tSleep – to simulate the “flow” of events in time – more “stretch” them in time. Otherwise, we would generate a packet of records and it would be difficult for us to observe that they are also consumed in real time.
- tMap" – to remap the fields
- tKafkaOutput – entry to Kafka
In addition, you must define:
- Broker list – list of Kafka brokers with ports separated by commas (String)
- Topic name – name of the topic to which we will send data (String)

Consumer – tKafkaInput
In the case of the consumer, a brokers list and topic should also be defined. An additional parameter is the provision of “Consumer group id”. We can enter anything here. In the case of production solutions, this parameter has a very important function, our consumers “know” which messages (offsets) have already been read by them. (This topic is much more extensive and I will not describe the details now).

tMongoDBOutput
The last element of our puzzle is redirecting the data stream towards the collection in MongoDB". To this end, we will use the tMongoDBOutput component, where we must provide: (Talend Kafka MongoDB Docker-Compose" real-time)
- our database server. In docker-compose".yml in the section MongoDB" -> hostname: we gave the name “mongodb” and defined the same in / etc / hosts, so we give our host name “mogodb” in this field.
- The same applies to the port. We used the standard so we leave 27017.
- As the base we set defined in the YAML file- “test_db“.

Run and test the application
It’s time to run our application and check how it works!
- To start, start the consumer – let them wait in readiness for messages.
- Then start the producer and watch how the records are processed right after they were saved to Kafka.

Finally, let’s check whether the data has actually been saved to MongoDB". For this purpose, we will make a ssh connection to the container with the Mongo base. (Talend Kafka MongoDB Docker-Compose" real-time)
# The name of my container is blog_mongodb_1 docker exec -it blog_mongodb_1 /bin/bash # Then start the "mongo" client root@mongodb:/# mongo MongoDB shell version v4.0.10 connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb Implicit session: session { "id" : UUID("a68fbc26-daab-4d15-8c26-f2d5ec45ebb5") } MongoDB server version: 4.0.10 Welcome to the MongoDB shell. For interactive help, type "help". For more comprehensive documentation, see http://docs.mongodb.org/ Questions? Try the support group http://groups.google.com/group/mongodb-user Server has startup warnings: 2019-09-30T20:13:39.852+0000 I STORAGE [initandlisten] 2019-09-30T20:13:39.852+0000 I STORAGE [initandlisten] ** WARNING: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine 2019-09-30T20:13:39.852+0000 I STORAGE [initandlisten] ** See http://dochub.mongodb.org/core/prodnotes-filesystem 2019-09-30T20:13:41.191+0000 I CONTROL [initandlisten] 2019-09-30T20:13:41.192+0000 I CONTROL [initandlisten] ** WARNING: Access control is not enabled for the database. 2019-09-30T20:13:41.192+0000 I CONTROL [initandlisten] ** Read and write access to data and configuration is unrestricted. 2019-09-30T20:13:41.192+0000 I CONTROL [initandlisten] --- Enable MongoDB's free cloud-based monitoring service, which will then receive and display metrics about your deployment (disk utilization, CPU, operation statistics, etc). The monitoring data will be available on a MongoDB website with a unique URL accessible to you and anyone you share the URL with. MongoDB may use this information to make product improvements and to suggest MongoDB products and deployment options to you. To enable free monitoring, run the following command: db.enableFreeMonitoring() To permanently disable this reminder, run the following command: db.disableFreeMonitoring() --- (Talend Kafka MongoDB Docker-Compose real-time)
We go to the “test_db” database, check whether our collection exists and finally download all data from the “testEvents” collection. (Talend Kafka MongoDB Docker-Compose" real-time)
> use test_db switched to db test_db > show collections testEvents > db.testEvents.find() { "_id" : ObjectId("5d9276696deec244a5b83d19"), "payload" : "gnOiI7" } { "_id" : ObjectId("5d92766a6deec244a5b83d1a"), "payload" : "rM4Mlc" } { "_id" : ObjectId("5d92766b6deec244a5b83d1b"), "payload" : "U85SZM" } { "_id" : ObjectId("5d92766c6deec244a5b83d1c"), "payload" : "17BkQ4" } { "_id" : ObjectId("5d92766d6deec244a5b83d1d"), "payload" : "IV56wq" } { "_id" : ObjectId("5d92766e6deec244a5b83d1e"), "payload" : "wnkplA" } { "_id" : ObjectId("5d92766f6deec244a5b83d1f"), "payload" : "OUlelH" } { "_id" : ObjectId("5d9276706deec244a5b83d20"), "payload" : "2OuDnN" }
Summary
Creating stream processing using Talend" and Kafka as you can see is not complicated. In a fairly short time, we are able to implement simple logic.
Drag & drop ETL tools like Talend generate Java" de facto code, which is then run like any other Java" application. Personally, I more like the custom code because many things can be written in an easier and more optimal way. On the other hand, ETL" tools enable faster development, easier analysis of what is happening in processing and it is certainly easier to find people or teach them such a tool faster than when writing applications from scratch. (Talend Kafka MongoDB Docker-Compose" real-time)
You can find the code on GitHub.
Could You Please Share This Post?
I appreciate It And Thank YOU! :)
Have A Nice Day!