Поговорим об акторах

Лично мне очень нравится концепция акторов. Что интересно, познакомился я с ней куда раньше повальной моды на функциональщину, в году так 2003, когда начал плотно работать с библиотекой ACE (это та, которая The ADAPTIVE Communication Environment). Ну а сейчас акторами никого не удивишь, все про них только и говорят. И это хорошо, так как данная модель сильно упрощает отладку и разработку, при относительно не большой просадке по производительности и памяти.

В последнее время я присматриваюсь у относительно экзотическим языкам программирования, таким как Rust и Scala, а для обоих языков модель акторов является родной. При этом, на данный момент, Rust ничего не может предложить сравнимого с библиотекой AKKA, хотя даже в текущем своем состоянии его представление модели акторов не безынтересно.

В качестве примера я решил взять фрагмент из задачи, которую решал на Scala и написать эту задачу на двух языках, Scala и Rust, настолько схожей, насколько это возможно.

Задача: Необходимо выполнить ряд запросов к серверу, обработать ответы, полученные с серверов и единовременно отдать результат.

Scala

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await
import akka.pattern
import scala.concurrent.duration._
import akka.util.Timeout

object App {
  case class Request(id: Int)                        // 1
  case class Response(response: String)
  case class Start()

  class Requester extends Actor {                    // 2
    def receive = {
      case Request(id) =>
        sender ! Response("Client #" + id.toString)  // 3
    }
  }

  class Supervisor extends Actor {                   // 4
    private val num = 100
    private val responses = new ArrayBuffer[String](num)
    private var requester:ActorRef = null            // 5
    def receive = {
      case Start => {
        val randomRouter = context.actorOf(          // 6
                Props[Requester].withRouter(RandomRouter(12)), "router")
        for (idx <- 1 to num) {
          randomRouter ! Request(idx)                // 7
        }
        requester = sender                           // 8
      }
      case Response(resp) => {                       // 9
        responses += resp
        if (responses.size == num) {
          requester ! responses                      // 10
          context.stop(self)                         // 11
          context.system.shutdown()                  // 12
        }
      }
    }
  }


  def main(args: Array[String]) {
    val supervisor = ActorSystem().actorOf(Props(new Supervisor)) // 13
    implicit val timeout = Timeout(60 seconds)

    val result = pattern.ask(supervisor, Start)                   // 14
    val responses = Await.result(result, 1.minute).asInstanceOf[ArrayBuffer[String]]
    println("Aggregated value: " + responses)
  }
}

Конечно, это сугубо индивидуально, но мне работа с библиотекой AKKA приносит удовольствие. Я не так давно закончил читать документацию относящуюся к этой библиотеке, и могу точно сказать, ее возможности и архитектурная продуманность удивляют. Наверное, это одна из лучших библиотек реализующих концепцию акторов на данный момент. Так, хватит хвалить, пора переходить к сути

Система акторов в AKKA представляет собой древовидную структуру, поэтому, довольно удобно иметь некий управляющий актор – супервизор 4, который, в свою очередь, организует раздачу задач подчиненным акторам 2. Актор запускается на выполнение сразу после создания 13 и начинает ожидать сообщение-команду (нашем случае все команды представляют собой case classes и объявлены в самом начале примера 1). Так как по условию задачи, необходимо получить обработанные результаты разом, то запрос на обработку в актор-супервизор необходимо отправить синхронно 14.

После того, как запрос был отправлен, начинается самое интересное. Дело в том, что я хотел что бы одновременное количество запросов к серверу находилось в неких разумных пределах, например, при необходимостм получить 20К ответов не должно было возникать 20К одновременных запросов. И тут на помощь приходят планировщики AKKA, которые умеют раздавать запросы строго определенному количеству акторов. В моем примере я решил создать планировщик 6 раздающий задачи 12 акторам одновременно. Так как все запросы выполняются асинхронно, то нам не надо организовывать какого-то дополнительного ожидания результата, как это было при старте актора-супервизора и мы просто передадим все запросы планировщику 7 последовательно, один за другим.

При работе с акторами довольно важным моментом является грамотное использование переменной sender, которая на момент вызова метода receive содержит в себе информацию об акторе, приславшем текущий запрос. Таким образом, при получении команды Start, данная переменная будет содержать информацию об отправителе, которому мы и должны вернуть окончательный результат. В то же время, тот же актор-супервизор будет получать ответы с результатами обработки запросов 9 что означает изменение значения переменной sender в процессе работы. Вывод: переменная sender должна быть сохранена для дальнейшего использования 5, 8.

Осталось не так уж и много: получить результаты, собрать их воедино и отдать все разом. Результаты работы акторов Requester мы будем получать в обработчике актора-супервизора 9 и накапливать для дальнейшего использования; после того как все ответы будут получены, их необходимо вернуть 10. В качестве последнего шага выполняется остановка актора-супервизора 11, что могло бы повлечь за собой автоматическую остановку всех дочерних акторов, если бы они небыли завершены к тому времени и саму систему акторов в целом 12.

Rust

Данный код гарантированно компилируется Rust 0.7. Поддержка более поздних версий компилятора возможна, но не гарантируется. Сборка более ранними версиями компилятора не возможна.
extern mod std;

use std::{task, io, uint};
use std::comm::{SharedChan};

struct TestInfo {
    mode: task::SchedMode,
    tasks_count: uint,
}

impl TestInfo {
    fn new(m: task::SchedMode) -> TestInfo {
        TestInfo {
            mode: m,
            tasks_count: 100u,
        }
    }

    fn process(&self) -> ~[~str] {
        let mut result:~[~str] = ~[];

        let (server_chan, server_port) = stream();                     // 1

        let server_port = SharedChan::new(server_port);                // 2
        for uint::range(0, self.tasks_count) |i| {
            let server_port: SharedChan<~str> = server_port.clone();

            do task::spawn_sched(self.mode) {                          // 3
                server_port.send(fmt!("Client #%u", i));               // 4
            }
        }

        for self.tasks_count.times {                                   // 5
            result.push(server_chan.recv());                           // 6
        }

        result
    }
}

fn main() {
    let supervisor = TestInfo::new(task::DefaultScheduler);            // 7
    let res = supervisor.process();
    io::println(fmt!("Aggregated value: %?", res));
}

Решение на Rust я старался сделать максимально близким к “эталонной” версии на Scala, но в силу отсутствия развитой библиотеки акторов, поведение получилось похожим, но не идентичным, а более простой и короткий код не более чем следствие отсутствия ряда возможностей имеющихся в AKKA. Только не подумайте, что я критикую Rust, на самом деле, я его очень люблю, просто он еще совсем молодой и зеленый

Хотя в составе библиотек Rust нет ничего, что бы носило имя Актор, де-факто задачи Rust и являются акторами. Задача так же как и акторы в AKKA имеют предков и потомков, задачам можно отправить сообщение и получить от них ответ. Задачи так же выполняются планировщиком, за исключением того, что прямо сейчас с планировщиками все очень печально.

Принципы работы с задачами в Rust отличаются от работы с акторами в AKKA приблизительно на столько, на сколько отличается написание сетевого приложения на C++ с использованием ACE от написания того же приложения с Berkeley sockets на прямую. Для обеспечения взаимодействия с задачами в Rust нам будет необходимо создать поток, состоящий из порта и канала 1 и преобразовать порт в SharedChan 2, так как один и тот же порт будет использоваться для отправки сообщений несколькими задачами, что не возможно сделать при использовании канала по-умолчанию.

Запуск задач на выполнение в Rust будет кардинально отличаться от запуска акторов в Scala. Дело в том, что в Scala мы создавали строго заданное количество задач и при помощи планировщика раздавали им задания. Реализовать схожее поведение в Rust не возможно по причине отсутствия такого функционала в библиотеках. Поэтому, для каждого из запросов будет создана 3 отдельная задача, которая будет отдана на выполнение указанному 7 планировщику. На данный момент, наиболее подходящим под ограничения задачи планировщиком является планировщик DefaultScheduler, который, в принципе, можно и не указывать, но для большей схожести с кодом на Scala я задал явно. Детали о работе планировщиков в Rust я не так давно описывал, но если в коротко, то, планировщик DefaultScheduler создаст по одному потоку на процессор и будет выполнять все задачи в рамках этих потоков.

С отправкой и сбором результатов все значительно проще чем в случае с примером на Scala. Каждая из дочерних задач формирует и отправляет 4 результаты в задачу-родитель. Задача-родитель, в цикле 5, из серверного канала, вычитывает 6 результаты в ожидаемом количестве и помещает их в контейнер.

Вот и все, на данный момент. После того, как планировщики Rust будут приведены в порядок, можно будет доработать Rust версию и, возможно, подумать о написании чего-то похожего на AKKA. Время покажет.

Дополнения, исправления, комментарии приветствуются

5 Comments Поговорим об акторах

  1. Alex

    Почему бы не сделать Request вложенным в Requester, а также, возможно, Response?

    А Start в Supervisor.

    Reply

Leave a Reply