Взаимодействие между задачами в Rust

Модель памяти Rust, в общем случае, не допускает совместного обращения к одной и той же памяти (shared model) предлагая вместо этого обмениваться сообщениями (mailbox model). При этом существует возможность работать с общей памятью в режимах “только для чтения” и “один писатель много читателей”. На данный момент в Rust существует несколько способов организации взаимодействия между задачами:

  • Низкоуровневые каналы и порты из модуля core::comm;
  • Высокоуровневая абстракция над каналами и портами std::comm;
  • Каналы предназначенные для передачи бинарных данных из std::flatpipes;
  • Новая инфраструктура для обмена сообщениями core::pipes.

Обмен сообщениями на низком уровне

Простейший вариант использования core::comm

Самым широко используемым на данный момент механизмом взаимодействия между задачами является core::comm. Данный механизм хорошо отлажен, неплохо задокументирован и довольно прост в использовании. Основой механизма обмена сообщениями core::comm являются потоки, манипуляция с которыми происходит посредством каналов и портов. Поток представляют собой однонаправленный механизм связи, в котором порт используется для отправки сообщения, а канал для приема отправленной информации. Простейший пример использования потока выглядит следующим образом:

let (chan, port) = stream();                  // (1)

port.send("data");                            // (2)
// port.send(1);                              // (3)
io::println(chan.recv());                     // (4)

В данном примере создается пара (1), состоящая из канала и порта, которые используются для отправки (2) строкового типа данных. Отдельное внимание стоит удалить пототипу функции stream(), который выглядит следующим образом: fn stream() -> (Port, Chan). Как видно из пототипа, канал и порт являются шаблонными типами, что на первый взгляд не очевидно из кода, приведенного выше. В данном случае тип передаваемых данных выводится автоматически основываясь на первом использовании. Так, если раскоментировать строку отправляющую в поток единицу (3), компилятор выдаст сообщение об ошибку:

error: mismatched types: expected `&'static str` but found `<VI0>` (expected &'static str but found integral variable

Так же стоит обратить внимание на класс шаблонного параметра: Owned, что означает возможность передачи при помощи потока только уникальных объектов.
Для получения данных из потока можно воспользоваться функцией recv(), которая либо вернет данные, либо заблокирует задачу до их появления. Смотря на пример выше, в голову приходит мысль о том, что он совершенно бесполезен, так как какого-то практического смысла в отправке сообщений при помощи потоков в рамках одной задачи нет. Так что, стоит переходить к более практичным вещам, таким как использование потоков для передачи информации между задачами.

let value = vec::from_fn(5, |x| x + 1);                    // (1)
let (server_chan, server_port) = stream();                 // (2)
let (client_chan, client_port) = stream();                 // (3)

do task::spawn {
    let val: ~[uint] = server_chan.recv();                 // (4)
    let res = val.map(|v| {v+1});
    client_port.send(res)                                  // (5)
}

server_port.send(value);                                   // (6)
io::println(fmt!("Result: %?", client_chan.recv()));       // (7)

Первое, на что стоит обратить внимание при работе с потоками – это необходимость передавать значения адресуемые уникальными указателями, а функция from_fn() (1) как раз создает такой массив. Так как поток является однонаправленным, то для передачи запроса (2) и получения ответа (3) понадобятся два потока. При помощи функции recv() данные считываются из потока (4), а при отсутствии таковых, поток заблокирует задачу до их появления. Для отправки результата клиенту используется функция send() (5), принадлежащая не серверному, а клиентскому потоку; аналогичным образом необходимо поступить с данными для отправки серверной задаче, они записываются (6) при помощи функции send() относящейся к серверному порту. И в заключении результат переданный серверной задачей считывается (7) из клиентского потока.
Таким образом, для отправки сообщений серверу и приема сообщений на стороне сервера используется поток server_chan, server_port. В силу однонаправленности потока, для получения результата вычислений сервера, был создан клиентский поток, состоящий из пары client_chan, client_port.

Совместное использование потока

Несмотря на то, что поток является однонаправленным механизмом передачи данных, это не приводит к необходимости создавать потоки для каждого из желающих отправить при помощи него данные, т.к, существует механизм благодаря которому возможна работа с в режиме “один получатель много отправителей”.

let (server_chan, server_port) = stream();             // (1)
let (client_chan, client_port) = stream();             // (2)

let clients = 5;
do task::spawn {                                       // (3)
    let mut result = 0;
    for uint::range(0, clients) |_i| {
        let val: uint = server_chan.recv();            // (4)
        result = val + result;
    }
    client_port.send(result);                          // (5)
}

let server_port = SharedChan(server_port);             // (6)
for uint::range(0, clients) |i| {
    let server_port = server_port.clone();             // (7)
    do task::spawn {
        let gen = rand::Rng();
        server_port.send(gen.gen_uint_range(0, i+1));  // (8)
    }
}
io::println(fmt!("Result: %?", client_chan.recv()));

Для этого, как и для схемы один читатель – один писатель необходимо создать серверный (1), клиентский (2) потоки и запустить серверную задачу (3). Логика серверной задачи предельно проста, считать (4) данные из серверного канала, переданные клиентом (8) и отправить (5) результат в клиентский поток. Так как писателей несколько, то необходимо внести изменения в тип серверного порта, преобразовав (6) его к SharedChan, вместо Chan и для каждого из писателей создать уникальную копию порта (7) посредствам метода clone(). Дальнейшая работа с портом ни в чем не отличается от предыдущего примера: метод send() используется для отправки данных серверу, с той лишь разницей, что теперь данные отправляются из нескольких задач одновременно.

Пересылка объектов

В тех случаях, когда необходимость пересылать значения адресуемые исключительно уникальными указателями становится проблемой, на помощь приходит модуль flatpipes. Данный модуль позволяет отправлять и принимать любые бинарные данные в виде массива, либо объекты поддерживающие сериализацию.

#[auto_encode]                                          // (1)
#[auto_decode]                                          // (2)
struct EncTest { val1: uint, val2: @str, val3: ~str }
...
let (server_chan, server_port) =
        flatpipes::serial::pipe_stream();               // (3)

do task::spawn {
    let val = server_chan.recv();                       // (4)
    io::println(fmt!("Value: %?", val));
}

let value = @EncTest{val1: 1u, val2: @"test string 1",
        val3: ~"test string 2"};
server_port.send(value);                                // (5)

Как видно из примера, работать с flatpipes предельно просто. Структура, объекты которой будут передаваться посредством flatpipes, должна быть объявлена сериализуемой (1) и десериализуемой (2). Создание flatpipes (3) технически ничем не отличается от создания обычных потоков, так же как прием (4) и отправка (5) сообщений при помощи канала и порта. Главным же отличием flatpipes от потока является создание глубокой копии объекта на отправляющей стороне и построение нового объекта на принимающей стороне. Благодаря такому подходу, накладные расходы при работе с flatpipes, по сравнению с обычными потоками возрастают, но и возможности по пересылке данных между задачами увеличиваются.

Высокоуровневая абстракция обмена сообщениями

В большинстве приведенных выше примеров создаются два потока: один для отправки данных на сервер, второй для получения данных с сервера. Подобный подход не привносит какой-то ощутимой пользы, да и просто замусоривает код. В связи с этим был создан модуль std::comm, являющийся высокоуровневой абстракцией над core::comm и содержащий в себе DuplexStream, позволяющий организовать двунаправленное общение в рамках одного потока. Само собой, если заглянуть в исходный код DuplexStream, станет ясно, что это не более чем удобная надстройка над парой стандартных потоков.

let value = ~[1, 2, 3, 4, 5];
let (server, client) = DuplexStream();            // (1)

do task::spawn {
let val: ~[uint] = server.recv();                 // (2)
io::println(fmt!("Value: %?", val));
let res = val.map(|v| {v+1});
server.send(res)                                  // (3)
}

client.send(value);                               // (4)
io::println(fmt!("Resilt: %?", client.recv()));   // (5)

При работе с DuplexStream создается (1) единственная пара из двух двунаправленных потоков, оба из которых могут использоваться как для отправки так и для получения сообщений. Объект server захватывается контекстом задачи и используется для получения (2) и отправки (3) сообщения в задаче сервера, а объект client в задаче клиента (4,5). Таким образом, принцип работы с DuplexStream ничем не отличается от работы с обычными потоками, но позволяет сократить количество вспомогательных объектов.

Модуль ARC

Несмотря на все прелести отправки сообщений рано или поздно возникает вопрос: а что делать с большой структурой данных, доступ к которой нужен из нескольких задач одновременно? Конечно, ее можно пересылать в виде уникального указателя между потоками, но такой подход сильно затруднит разработку приложения, а его сопровождение превратится в кошмар. Именно для таких случаев и существует модуль ARC позволяющий организовать совместный доступ из нескольких задач к одному и тому же объекту.

Совместное использование уникальных указателей с доступом только на чтение

Сначала стоит разобраться с самым простым случаем – совместным доступом к неизменяемым данным из нескольких задач. Для этого необходимо воспользоваться модулем ARC, который реализует механизм автоматического подсчета ссылок (Atomically Reference-Counter) на разделяемый объект. В прототипе функции создания ARC-объекта pub fn ARC(data: T) -> ARC стоит обратить внимание на налагаемые на тип T ограничения. Теперь объект должен относиться не только к классу Owned, как это было в случае с потоком, но еще и к классу Const, что гарантирует отсутствие каких бы то ни было изменяемых полей или указателей на изменяемые поля внутри объекта T (такие объекты в Rust носят название deeply immutable объекты).

let data = arc::ARC(~[1, 2, 3, 4, 5]);                // (1)

let shared_data = arc::clone(&amp;data);              // (2)
do task::spawn {
    let val = arc::get(&amp;shared_data);             // (3)
    io::println(fmt!("Value: %?", val[1]));
}

io::println(fmt!("Value: %?", *arc::get(&amp;data))); // (4)

Несмотря на то, что в данном примере нет работы с потоками, он вполне достаточен для иллюстрации работы с ARC, так как хорошо показывает основной функционал этого модуля – возможность одновременно обращаться к одним и тем же данным из разных задач. Так, для совместного использования одного и того же массива, обернутого в ARC (1), надо создать его клон (2), что сделает возможным обращаться к данным как из новой (3), так и из основной (4) задач.

R/W доступ к уникальным указателям

Модуль RWARC лично у меня вызывает очень двоякие эмоции. С одной стороны, благодаря RWARC можно реализовать широко распространенную и хорошо известную большинству разработчиков концепцию “много читателей один писатель”, что, наверное, хорошо, концепция-то широко известная. С другой стороны, концепция совместного доступа к памяти, причем не RO доступ, который был описан чуть ранее, а RW доступ, чреват проблемами с взаимоблокировками, от которых Rust как раз и должен защитить разработчиков. Лично для себя я пришел к следующему выводу: о модуле знать надо, но использовать, без крайней необходимости, не стоит.

let data = arc::RWARC(~[1, 2, 3, 4, 5]);            // (1)

for 5.times {
    let reader = data.clone();                      // (2)
    do task::spawn {
        do reader.read() |data| {                   // (3)
            io::println(fmt!("Value: %?", data));   // (4)
        }
    }
}

do spawn {
    do data.write() |data| {                        // (5)
        for data.each_mut |x| { *x = *x * 2 }       // (6)
    }
}

В приведенном выше примере, создается (1) массив обернутый в RWARC, благодаря чему к нему можно обращаться как на чтение (4), так и на запись (6). Кардинальное отличие работы с RWARC от всех предыдущих примеров – использование замыканий в функциях read() (3) и write() (5) в качестве аргумента. Чтение и запись данных, обернутых в RWARC, можно производить только в этих функциях. Ну и, как обычно, необходимо создать копию (2) объекта, для доступа к нему из замыкания, так как в противном случае оригинал станет недоступным.

pipes

В составе библиотеки core появился новый модуль pipes. На данный момент модуль производит ощущение очень сырого, ожидающего не мало изменений. Так что, описание работы с ним я оставлю на потом, когда появится уверенность в стабильности интерфейсов.

Leave a Reply