Akka.io: Szybki start [Java]

Akka.io: Szybki start [Java]

Bardzo często możesz spotkać się z wymaganiem stworzenia aplikacji, której zadaniem będzie gromadzenie danych z zewnętrznego systemu poprzez jego API. Może się okazać, że “wąskim gardłem” takiej aplikacji będzie czas jej wykonania.

Załóżmy, że co 15 min należy odświeżyć dane o klientach. API z jakim mamy do czynienia nie pozwala na wysłanie żądania, które zwróci informacje o wszystkich klientach lub grupie klientów. Możemy jedynie wysłać żądanie , gdzie możemy podać ID klienta. Klientów mamy około 10 tysięcy. Średni czas od wysłania zapytania do czasu odpowiedzi wynosi 0.4 sekundy. Jak łatwo policzyć, gdy uruchomimy aplikację to czas w jakim będziemy w stanie odebrać odświeżyć dane będzie wynosił: 66,66 min, co przekracza okno odświeżania ponad 4 razy!. W skrócie, nie spełnimy stawianych przed nami wymagań.

Liczba klientów 10 000
Średni czas odpowiedzi 0.4 sek
Czas pobierania danych  (jeden wątek) 66,66 min
Czas pobierania danych z wykorzystaniem 5 aktorów (Akka) 13,33 min

Jednym z rozwiązań możemy być uruchomienie tej samej aplikacji kilkanaście razy, gdzie każda instancja otrzyma inny zbiór ID klientów. Lecz jest to mało eleganckie rozwiązanie i przysparza więcej problemów, ponieważ musimy dodać logikę dzielenia klientów na paczki, potem uruchamiać kilkanaście instancji, monitorować je itd.

Akka.io

Z pomocą przychodzi nam Akka. Oparta na koncepcji aktorów, gdzie każdy aktor jest uruchamiany jako nie zależny wątek. Implementacja tej biblioteki jest bardzo prosta.

Definiujemy aktorów i przydzielamy im zadania. Nie interesuje nas obsługa współbieżności, dzięki czemu możemy spojrzeć na ten problem jeszcze z wyższego poziomu. 

1. Koncepcja i komunikacja

Na poniższym rysunku przedstawiona została koncepcja aktorów oraz komunikacja pomiędzy nimi, gdzie przedstawiona jest ona jako dwukierunkowa, lecz oczywiście można ona być również jednokierunkowa (taka też została zaimplementowana w przykładnie zawartym w tym poście).

Aplikacja składa się z jednego aktora, który jest odpowiedzialny tylko i wyłącznie za wyświetlanie wiadomości w konsol (WorkersAdministrator), które będą wysyłane przez aktorów (workerów).

ActorSystem system = ActorSystem.create("Actor-api-collector");

// Create Actor, which will be responsible only for printing messages from workers
final ActorRef workersAdministrator = system.actorOf(Printer.props(), "workersAdministrator");

// Create set of workers
ActorRef worker1 = system.actorOf(DataCollector.props(workersAdministrator));
ActorRef worker2 = system.actorOf(DataCollector.props(workersAdministrator));
ActorRef worker3 = system.actorOf(DataCollector.props(workersAdministrator));
ActorRef worker4 = system.actorOf(DataCollector.props(workersAdministrator));
ActorRef worker5 = system.actorOf(DataCollector.props(workersAdministrator));

 

Każdy Worker będzie wysyłał informacje do WorkersAdministratora o tym, że:

  1. Otrzymał zadanie, aby pobrać informacje o kliencie o przekazanym ID.
    workersAdministrator.tell("--> Getting data for ID=[" + id + "] ... ", getSelf());
  2. Pobieranie zostało zakończone. Dodatkowo przekaże w wiadomości czas jaki upłynął od otrzymania zadania do zakończenia pobierania.
    workersAdministrator.tell("<-- Data collected! ID=[" + id + "] in time=[" + sleepInt + "] milliseconds.", getSelf());

     

.match(Integer.class, id -> {
    // Tell the WorkersAdministrator that this Actor sent the request and is waiting for response.
    workersAdministrator.tell("--> Getting data for ID=[" + id + "] ... ", getSelf());

    // Get the response time and tell the WorkersAdministrator that data was received.
    int sleepInt = collectData();
    workersAdministrator.tell("<-- Data collected! ID=[" + id + "] in time=[" + sleepInt + "] milliseconds.", getSelf());
})

2. WorkersAdministrtor

Każda klasa, która ma być aktorem musi rozszerzać klasę AbstractActor. Wymusi to na użytkowniku zaimplementowanie metody: createReceive(). 

W tej metodzie zostaje zaimplementowana logika wykorzystująca matching pattern (bardziej jest on znany ze Scali, niż z języka Java). Definiujemy jakiej klasy oczekujemy obiektu, który będzie przekazywany. W naszym przypadku aktorzy (workerzy) będą przekazywać wiadomość tekstową, dlatego jest ona klasy String.class, a “message” jest naszą referencją do tego obiektu.

package com.bigdataetl.akka;

import akka.actor.AbstractActor;
import akka.actor.Props;

public class WorkersAdministrator extends AbstractActor {

    static public Props props() {
        return Props.create(WorkersAdministrator.class, () -> new WorkersAdministrator());
    }

    public WorkersAdministrator() {
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, message -> {
                    // Just print the incoming messages from the Actors
                    System.out.println(message);
                })
                .build();
    }

}

3. Worker (DataCollector)

Nasz worker, czyli instancja klasy DataCollector również musi rozszerzać klase AcstractActor.

W przypadku Workera przekazywany jest ID klienta, który w naszym przypadku jest wartością typu Integer.

 Aby zasymulować oczekiwanie na odpowiedź serwera na wysłane żądanie, została stworzna prywatna metoda collectData(), która zwraca losową wartość milisekund, które wstrzymają dany wątek (aktora), a następnie zwróci tą wartość po to, aby można ją było przekazać w wiadomości do administratora.

package com.bigdataetl.akka;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;

import java.util.Random;

public class DataCollector extends AbstractActor {

    private int actorId;
    private ActorRef workersAdministrator;
    private Random rnd = new Random();

    public DataCollector(int actorId, ActorRef workersAdministrator) {
        this.actorId = actorId;
        this.workersAdministrator = workersAdministrator;
    }

    static public Props props(int actorId, ActorRef printerActor) {
        return Props.create(DataCollector.class, () -> new DataCollector(actorId, printerActor));
    }

    // This method is a API mock-up response.
    // We sent the request and we are waiting for response.
    // I used the random class to demonstrate the random time of response
    private int collectData() {
        int sleepTime = rnd.nextInt(300) + 300;
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return sleepTime;
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Integer.class, clientId -> {
                    // Tell the WorkersAdministrator that this Actor sent the request and is waiting for response.
                    workersAdministrator.tell("--> {Actor " + actorId + "} Sending request for data. ID=[" + clientId + "] ... ", getSelf());

                    // Get the response time and tell the WorkersAdministrator that data was received.
                    int sleepInt = collectData();
                    workersAdministrator.tell("<-- {Actor " + actorId + "} Data collected! ID=[" + clientId + "] in time=[" + sleepInt + "] milliseconds.", getSelf());
                })
                .build();
    }
}

 

4. Klasa Main

Na początek rejestrujemy AktorSystem. Następnie w ramach “systemu” rejestrowani będą aktorzy.

Zwróć uwagę, że to od Ciebie zależy jakie będą relacje pomiędzy aktorami. Z punktu widzenia biblioteki, wszyscy aktorzy są sobie równi. To deweloper wyznacza drzewo relacji pomiędzy nimi.

Następnie kolejno rejestrujemy WorkerAdministratora oraz 5 aktorów (DataCollector).

W pętli zawarta jest logika, która deleguje pracę do poszczególnych workerów. Dla uproszczenia założyłem, że ID klientów będą zaczynały się od 1 do 100.

package com.bigdataetl;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.bigdataetl.akka.DataCollector;
import com.bigdataetl.akka.WorkersAdministrator;

public class Main {

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("Actor-api-collector");

        // Create Actor, which will be responsible only for printing messages from workers
        final ActorRef workersAdministrator = system.actorOf(WorkersAdministrator.props(), "workersAdministrator");

        // Create set of workers
        ActorRef worker1 = system.actorOf(DataCollector.props(1,workersAdministrator));
        ActorRef worker2 = system.actorOf(DataCollector.props(2,workersAdministrator));
        ActorRef worker3 = system.actorOf(DataCollector.props(3,workersAdministrator));
        ActorRef worker4 = system.actorOf(DataCollector.props(4,workersAdministrator));
        ActorRef worker5 = system.actorOf(DataCollector.props(5, workersAdministrator));

        // This loop demonstrate how we can delegate jobs to Actors.
        // We would like to sent the requests to API to get data for each requested ID.
        for (int id = 1; id <= 100; id++) {

            switch (id % 5) {
                case 0:
                    worker1.tell(id, ActorRef.noSender());
                    break;
                case 1:
                    worker2.tell(id, ActorRef.noSender());
                    break;
                case 2:
                    worker3.tell(id, ActorRef.noSender());
                    break;
                case 3:
                    worker4.tell(id, ActorRef.noSender());
                    break;
                case 4:
                    worker5.tell(id, ActorRef.noSender());
                    break;
            }

        }
    }

}

 

5. Wynik

Po uruchomieniu programu powinieneś otrzymać podobny rezultat jak poniżej.

Jak widać, na początku widzimy tylko informacje o tym, że praca została przydzielona do poszczególnych aktorów, po czym w późniejszych wpisach widzimy jak zaczynają się pojawiać informację o tym, że nastąpiła odpowiedź z serwera i dane zostały pobrane. Gdy dany aktor skończy pobieranie danych natychmiast otrzymuje kolejne zadanie do wykonania.

--> {Actor 2} Sending request for data. ID=[1] ... 
--> {Actor 3} Sending request for data. ID=[2] ... 
--> {Actor 5} Sending request for data. ID=[4] ... 
--> {Actor 4} Sending request for data. ID=[3] ... 
--> {Actor 1} Sending request for data. ID=[5] ... 
<-- {Actor 5} Data collected! ID=[4] in time=[400] milliseconds.
--> {Actor 5} Sending request for data. ID=[9] ... 
<-- {Actor 4} Data collected! ID=[3] in time=[517] milliseconds.
--> {Actor 4} Sending request for data. ID=[8] ... 
<-- {Actor 2} Data collected! ID=[1] in time=[530] milliseconds.
--> {Actor 2} Sending request for data. ID=[6] ... 
<-- {Actor 3} Data collected! ID=[2] in time=[563] milliseconds.
--> {Actor 3} Sending request for data. ID=[7] ... 
<-- {Actor 1} Data collected! ID=[5] in time=[578] milliseconds.
--> {Actor 1} Sending request for data. ID=[10] ... 
<-- {Actor 5} Data collected! ID=[9] in time=[485] milliseconds.
--> {Actor 5} Sending request for data. ID=[14] ... 
<-- {Actor 3} Data collected! ID=[7] in time=[378] milliseconds.
--> {Actor 3} Sending request for data. ID=[12] ... 
<-- {Actor 1} Data collected! ID=[10] in time=[397] milliseconds.
--> {Actor 1} Sending request for data. ID=[15] ... 
<-- {Actor 4} Data collected! ID=[8] in time=[508] milliseconds.
--> {Actor 4} Sending request for data. ID=[13] ... 
<-- {Actor 2} Data collected! ID=[6] in time=[550] milliseconds.
--> {Actor 2} Sending request for data. ID=[11] ... 
<-- {Actor 1} Data collected! ID=[15] in time=[330] milliseconds.
--> {Actor 1} Sending request for data. ID=[20] ... 
<-- {Actor 5} Data collected! ID=[14] in time=[424] milliseconds.
--> {Actor 5} Sending request for data. ID=[19] ... 
<-- {Actor 3} Data collected! ID=[12] in time=[504] milliseconds.
--> {Actor 3} Sending request for data. ID=[17] ... 
<-- {Actor 4} Data collected! ID=[13] in time=[484] milliseconds.
--> {Actor 4} Sending request for data. ID=[18] ... 
<-- {Actor 2} Data collected! ID=[11] in time=[509] milliseconds.
--> {Actor 2} Sending request for data. ID=[16] ... 
<-- {Actor 1} Data collected! ID=[20] in time=[379] milliseconds.
--> {Actor 1} Sending request for data. ID=[25] ... 
<-- {Actor 5} Data collected! ID=[19] in time=[419] milliseconds.
--> {Actor 5} Sending request for data. ID=[24] ... 
<-- {Actor 4} Data collected! ID=[18] in time=[315] milliseconds.
--> {Actor 4} Sending request for data. ID=[23] ... 
<-- {Actor 3} Data collected! ID=[17] in time=[496] milliseconds.
--> {Actor 3} Sending request for data. ID=[22] ... 
<-- {Actor 2} Data collected! ID=[16] in time=[494] milliseconds.
--> {Actor 2} Sending request for data. ID=[21] ... 
<-- {Actor 1} Data collected! ID=[25] in time=[417] milliseconds.
--> {Actor 1} Sending request for data. ID=[30] ... 
<-- {Actor 5} Data collected! ID=[24] in time=[410] milliseconds.
--> {Actor 5} Sending request for data. ID=[29] ... 
<-- {Actor 4} Data collected! ID=[23] in time=[321] milliseconds.
...

Podsumowanie

Biblioteka Akka jak widać jest prosta w użyciu, a daje przy tym nie widoczny na dewelopera system zarządzania wątkami. Zastosowanie tej biblioteki może pomóc w rozwiązaniu wielu problemów, które z natury w jednowątkowym trybie mogą być nie do wykonania.

Jeśli spodobał Ci się ten post to napisz proszę komentarz poniżej oraz udostępnij ten post na swoim Facebook’u, Twitter’ze, LinkedIn lub innej stronie z mediami społecznościowymi.
Z góry dzięki!

Please follow and like us:

Dodaj komentarz

Close Menu
Social media & sharing icons powered by UltimatelySocial

Enjoy this blog? Please spread the word :)