Akka.io: Quickstart with Java

Akka.io: Quickstart with Java

Very often you can meet the requirement to create an application that will collect data from an external system through its API. It may turn out that the “bottleneck” of such an application will be the time of its implementation.

Let’s assume you need to refresh customer data every 15 minutes. The API we are dealing with does not allow us to send a request that will return information about all clients or groups of clients. We can only send a request where we can provide the customer ID. We have about 10,000 customers. The average time from sending a query to the response time is 0.4 seconds. As it is easy to count, when we run the application, the time in which we will be able to receive the refresh data will be: 66.66 min, which exceeds the refresh window more than 4 times!. In short, we will not meet the requirements set before us.

Number of clients 10 000
Average response time 0.4 sec
Data download time (one thread) 66,66 min
Time of downloading data using 5 actors (Akka) 13,33 min

One solution can be to run the same application several times, where each instance will receive a different set of customer IDs. But this is not a very elegant solution and causes more problems, because we have to add the logic to divide customers into packages, then run several instances, monitor them, etc.

Akka.io

Akka comes to help. Based on the concept of actors, where each actor is run as an independent thread. The implementation of this library is very simple.

We define actors and assign them tasks. We are not interested in concurrency services, thanks to which we can look at this problem from a higher level.

1. Concept and communication

The following figure shows the concept of actors and communication between them, where it is presented as two-way, but of course it can also be unidirectional (it was also implemented in the example included in this post).

The application consists of one actor, who is responsible only for displaying messages in consoles (WorkersAdministrator), which will be sent by actors (workers).

ActorSystem system = ActorSystem.create("Actor-api-collector");

// Create Actor, which will be responsible only for printing messages from workers
final ActorRef workersAdministrator = system.actorOf(Printer.props(), "workersAdministrator");

// Create set of workers
ActorRef worker1 = system.actorOf(DataCollector.props(workersAdministrator));
ActorRef worker2 = system.actorOf(DataCollector.props(workersAdministrator));
ActorRef worker3 = system.actorOf(DataCollector.props(workersAdministrator));
ActorRef worker4 = system.actorOf(DataCollector.props(workersAdministrator));
ActorRef worker5 = system.actorOf(DataCollector.props(workersAdministrator));

 

Each Worker will send information to the WorkersAdministrator about the fact that:

  1. It received the task to retrieve information about the customer about the transferred ID.
    workersAdministrator.tell("--> Getting data for ID=[" + id + "] ... ", getSelf());
  2. The download has been completed. In addition, it will tell in the message the time elapsed since the receipt of the task to the end of the download.
    workersAdministrator.tell("<-- Data collected! ID=[" + id + "] in time=[" + sleepInt + "] milliseconds.", getSelf());

2. WorkersAdministrtor

Each class that is to be an actor must expand the AbstractActor class. This will force the user to implement the: createReceive() method.

This method implements a logic that uses matching pattern (it is more known from Scala than from Java). We define what class we expect the object to be passed on. In our case, the actors (workers) will transmit a text message, which is why it is of the class String.class, and “message” is our reference to this object.

package com.bigdataetl.akka;

import akka.actor.AbstractActor;
import akka.actor.Props;

public class WorkersAdministrator extends AbstractActor {

    static public Props props() {
        return Props.create(WorkersAdministrator.class, () -> new WorkersAdministrator());
    }

    public WorkersAdministrator() {
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, message -> {
                    // Just print the incoming messages from the Actors
                    System.out.println(message);
                })
                .build();
    }

}

3. Worker (DataCollector)

Our worker, i.e. an instance of the DataCollector class, must also extend the AcstractActor class.

In the case of Vorker, a customer ID is passed, which in our case is a value of Integer type.

To simulate the wait for a server response to a request sent, a private collectData() method was created that returns a random value of milliseconds that will hold the thread (actor) and then return this value so that it can be passed to the administrator.

package com.bigdataetl.akka;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;

import java.util.Random;

public class DataCollector extends AbstractActor {

    private int actorId;
    private ActorRef workersAdministrator;
    private Random rnd = new Random();

    public DataCollector(int actorId, ActorRef workersAdministrator) {
        this.actorId = actorId;
        this.workersAdministrator = workersAdministrator;
    }

    static public Props props(int actorId, ActorRef printerActor) {
        return Props.create(DataCollector.class, () -> new DataCollector(actorId, printerActor));
    }

    // This method is a API mock-up response.
    // We sent the request and we are waiting for response.
    // I used the random class to demonstrate the random time of response
    private int collectData() {
        int sleepTime = rnd.nextInt(300) + 300;
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return sleepTime;
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Integer.class, clientId -> {
                    // Tell the WorkersAdministrator that this Actor sent the request and is waiting for response.
                    workersAdministrator.tell("--> {Actor " + actorId + "} Sending request for data. ID=[" + clientId + "] ... ", getSelf());

                    // Get the response time and tell the WorkersAdministrator that data was received.
                    int sleepInt = collectData();
                    workersAdministrator.tell("<-- {Actor " + actorId + "} Data collected! ID=[" + clientId + "] in time=[" + sleepInt + "] milliseconds.", getSelf());
                })
                .build();
    }
}

 

 

4. Main class

At the beginning, we register AktorSystem. Then, the actors will be registered as part of the “system“.

Please note that it’s up to you what the relationship between the actors will be. From the point of view of the library, all actors are equal. It is the developer who sets the tree of relations between them.

Then, in turn, we register Worker Administrator and 5 actors (DataCollector).

The loop contains a logic that delegates work to individual workers. For simplicity, I assumed that customer IDs would start from 1 to 100.

package com.bigdataetl;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.bigdataetl.akka.DataCollector;
import com.bigdataetl.akka.WorkersAdministrator;

public class Main {

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("Actor-api-collector");

        // Create Actor, which will be responsible only for printing messages from workers
        final ActorRef workersAdministrator = system.actorOf(WorkersAdministrator.props(), "workersAdministrator");

        // Create set of workers
        ActorRef worker1 = system.actorOf(DataCollector.props(1,workersAdministrator));
        ActorRef worker2 = system.actorOf(DataCollector.props(2,workersAdministrator));
        ActorRef worker3 = system.actorOf(DataCollector.props(3,workersAdministrator));
        ActorRef worker4 = system.actorOf(DataCollector.props(4,workersAdministrator));
        ActorRef worker5 = system.actorOf(DataCollector.props(5, workersAdministrator));

        // This loop demonstrate how we can delegate jobs to Actors.
        // We would like to sent the requests to API to get data for each requested ID.
        for (int id = 1; id <= 100; id++) {

            switch (id % 5) {
                case 0:
                    worker1.tell(id, ActorRef.noSender());
                    break;
                case 1:
                    worker2.tell(id, ActorRef.noSender());
                    break;
                case 2:
                    worker3.tell(id, ActorRef.noSender());
                    break;
                case 3:
                    worker4.tell(id, ActorRef.noSender());
                    break;
                case 4:
                    worker5.tell(id, ActorRef.noSender());
                    break;
            }

        }
    }

}

 

5. Results

After starting the program, you should get a similar result as below.

As you can see, at the beginning, we only see information that the work was assigned to individual actors, and then in later entries we can see how the information about the response from the server and the data have been retrieved begins to appear. When the actor finishes downloading, he immediately gets another task to do.

--> {Actor 2} Sending request for data. ID=[1] ... 
--> {Actor 3} Sending request for data. ID=[2] ... 
--> {Actor 5} Sending request for data. ID=[4] ... 
--> {Actor 4} Sending request for data. ID=[3] ... 
--> {Actor 1} Sending request for data. ID=[5] ... 
<-- {Actor 5} Data collected! ID=[4] in time=[400] milliseconds.
--> {Actor 5} Sending request for data. ID=[9] ... 
<-- {Actor 4} Data collected! ID=[3] in time=[517] milliseconds.
--> {Actor 4} Sending request for data. ID=[8] ... 
<-- {Actor 2} Data collected! ID=[1] in time=[530] milliseconds.
--> {Actor 2} Sending request for data. ID=[6] ... 
<-- {Actor 3} Data collected! ID=[2] in time=[563] milliseconds.
--> {Actor 3} Sending request for data. ID=[7] ... 
<-- {Actor 1} Data collected! ID=[5] in time=[578] milliseconds.
--> {Actor 1} Sending request for data. ID=[10] ... 
<-- {Actor 5} Data collected! ID=[9] in time=[485] milliseconds.
--> {Actor 5} Sending request for data. ID=[14] ... 
<-- {Actor 3} Data collected! ID=[7] in time=[378] milliseconds.
--> {Actor 3} Sending request for data. ID=[12] ... 
<-- {Actor 1} Data collected! ID=[10] in time=[397] milliseconds.
--> {Actor 1} Sending request for data. ID=[15] ... 
<-- {Actor 4} Data collected! ID=[8] in time=[508] milliseconds.
--> {Actor 4} Sending request for data. ID=[13] ... 
<-- {Actor 2} Data collected! ID=[6] in time=[550] milliseconds.
--> {Actor 2} Sending request for data. ID=[11] ... 
<-- {Actor 1} Data collected! ID=[15] in time=[330] milliseconds.
--> {Actor 1} Sending request for data. ID=[20] ... 
<-- {Actor 5} Data collected! ID=[14] in time=[424] milliseconds.
--> {Actor 5} Sending request for data. ID=[19] ... 
<-- {Actor 3} Data collected! ID=[12] in time=[504] milliseconds.
--> {Actor 3} Sending request for data. ID=[17] ... 
<-- {Actor 4} Data collected! ID=[13] in time=[484] milliseconds.
--> {Actor 4} Sending request for data. ID=[18] ... 
<-- {Actor 2} Data collected! ID=[11] in time=[509] milliseconds.
--> {Actor 2} Sending request for data. ID=[16] ... 
<-- {Actor 1} Data collected! ID=[20] in time=[379] milliseconds.
--> {Actor 1} Sending request for data. ID=[25] ... 
<-- {Actor 5} Data collected! ID=[19] in time=[419] milliseconds.
--> {Actor 5} Sending request for data. ID=[24] ... 
<-- {Actor 4} Data collected! ID=[18] in time=[315] milliseconds.
--> {Actor 4} Sending request for data. ID=[23] ... 
<-- {Actor 3} Data collected! ID=[17] in time=[496] milliseconds.
--> {Actor 3} Sending request for data. ID=[22] ... 
<-- {Actor 2} Data collected! ID=[16] in time=[494] milliseconds.
--> {Actor 2} Sending request for data. ID=[21] ... 
<-- {Actor 1} Data collected! ID=[25] in time=[417] milliseconds.
--> {Actor 1} Sending request for data. ID=[30] ... 
<-- {Actor 5} Data collected! ID=[24] in time=[410] milliseconds.
--> {Actor 5} Sending request for data. ID=[29] ... 
<-- {Actor 4} Data collected! ID=[23] in time=[321] milliseconds.
...

Summary

Akka library, as you can see, is easy to use, and gives you a thread management system that is not visible to the developer. Using this library can help solve many of the problems that may be unmanageable in nature in a single-threaded mode.

If you enjoyed this post please leave the comment below or share this post on your Facebook, Twitter, LinkedIn or another social media webpage.
Thanks in advanced!

Please follow and like us:

Leave a Reply

Close Menu
Social media & sharing icons powered by UltimatelySocial

Enjoy this blog? Please spread the word :)