Модель памяти Rust, в общем случае, не допускает совместного обращения к одной и той же памяти (shared model) предлагая вместо этого обмениваться сообщениями (mailbox model). При этом существует возможность работать с общей памятью в режимах “только для чтения” и “один писатель много читателей”. На данный момент в Rust существует несколько способов организации взаимодействия между задачами:
- Низкоуровневые каналы и порты из модуля core::comm;
- Высокоуровневая абстракция над каналами и портами std::comm;
- Каналы предназначенные для передачи бинарных данных из std::flatpipes;
- Новая инфраструктура для обмена сообщениями core::pipes.
Обмен сообщениями на низком уровне
Простейший вариант использования core::comm
Самым широко используемым на данный момент механизмом взаимодействия между задачами является core::comm. Данный механизм хорошо отлажен, неплохо задокументирован и довольно прост в использовании. Основой механизма обмена сообщениями core::comm являются потоки, манипуляция с которыми происходит посредством каналов и портов. Поток представляют собой однонаправленный механизм связи, в котором порт используется для отправки сообщения, а канал для приема отправленной информации. Простейший пример использования потока выглядит следующим образом:
port.send("data"); // (2)
// port.send(1); // (3)
io::println(chan.recv()); // (4)
В данном примере создается пара (1), состоящая из канала и порта, которые используются для отправки (2) строкового типа данных. Отдельное внимание стоит удалить пототипу функции
Так же стоит обратить внимание на класс шаблонного параметра:
Для получения данных из потока можно воспользоваться функцией
let (server_chan, server_port) = stream(); // (2)
let (client_chan, client_port) = stream(); // (3)
do task::spawn {
let val: ~[uint] = server_chan.recv(); // (4)
let res = val.map(|v| {v+1});
client_port.send(res) // (5)
}
server_port.send(value); // (6)
io::println(fmt!("Result: %?", client_chan.recv())); // (7)
Первое, на что стоит обратить внимание при работе с потоками – это необходимость передавать значения адресуемые уникальными указателями, а функция
Таким образом, для отправки сообщений серверу и приема сообщений на стороне сервера используется поток server_chan, server_port. В силу однонаправленности потока, для получения результата вычислений сервера, был создан клиентский поток, состоящий из пары client_chan, client_port.
Совместное использование потока
Несмотря на то, что поток является однонаправленным механизмом передачи данных, это не приводит к необходимости создавать потоки для каждого из желающих отправить при помощи него данные, т.к, существует механизм благодаря которому возможна работа с в режиме “один получатель много отправителей”.
let (client_chan, client_port) = stream(); // (2)
let clients = 5;
do task::spawn { // (3)
let mut result = 0;
for uint::range(0, clients) |_i| {
let val: uint = server_chan.recv(); // (4)
result = val + result;
}
client_port.send(result); // (5)
}
let server_port = SharedChan(server_port); // (6)
for uint::range(0, clients) |i| {
let server_port = server_port.clone(); // (7)
do task::spawn {
let gen = rand::Rng();
server_port.send(gen.gen_uint_range(0, i+1)); // (8)
}
}
io::println(fmt!("Result: %?", client_chan.recv()));
Для этого, как и для схемы один читатель – один писатель необходимо создать серверный (1), клиентский (2) потоки и запустить серверную задачу (3). Логика серверной задачи предельно проста, считать (4) данные из серверного канала, переданные клиентом (8) и отправить (5) результат в клиентский поток. Так как писателей несколько, то необходимо внести изменения в тип серверного порта, преобразовав (6) его к
Пересылка объектов
В тех случаях, когда необходимость пересылать значения адресуемые исключительно уникальными указателями становится проблемой, на помощь приходит модуль flatpipes. Данный модуль позволяет отправлять и принимать любые бинарные данные в виде массива, либо объекты поддерживающие сериализацию.
#[auto_decode] // (2)
struct EncTest { val1: uint, val2: @str, val3: ~str }
...
let (server_chan, server_port) =
flatpipes::serial::pipe_stream(); // (3)
do task::spawn {
let val = server_chan.recv(); // (4)
io::println(fmt!("Value: %?", val));
}
let value = @EncTest{val1: 1u, val2: @"test string 1",
val3: ~"test string 2"};
server_port.send(value); // (5)
Как видно из примера, работать с flatpipes предельно просто. Структура, объекты которой будут передаваться посредством flatpipes, должна быть объявлена сериализуемой (1) и десериализуемой (2). Создание flatpipes (3) технически ничем не отличается от создания обычных потоков, так же как прием (4) и отправка (5) сообщений при помощи канала и порта. Главным же отличием flatpipes от потока является создание глубокой копии объекта на отправляющей стороне и построение нового объекта на принимающей стороне. Благодаря такому подходу, накладные расходы при работе с flatpipes, по сравнению с обычными потоками возрастают, но и возможности по пересылке данных между задачами увеличиваются.
Высокоуровневая абстракция обмена сообщениями
В большинстве приведенных выше примеров создаются два потока: один для отправки данных на сервер, второй для получения данных с сервера. Подобный подход не привносит какой-то ощутимой пользы, да и просто замусоривает код. В связи с этим был создан модуль std::comm, являющийся высокоуровневой абстракцией над core::comm и содержащий в себе
let (server, client) = DuplexStream(); // (1)
do task::spawn {
let val: ~[uint] = server.recv(); // (2)
io::println(fmt!("Value: %?", val));
let res = val.map(|v| {v+1});
server.send(res) // (3)
}
client.send(value); // (4)
io::println(fmt!("Resilt: %?", client.recv())); // (5)
При работе с
Модуль ARC
Несмотря на все прелести отправки сообщений рано или поздно возникает вопрос: а что делать с большой структурой данных, доступ к которой нужен из нескольких задач одновременно? Конечно, ее можно пересылать в виде уникального указателя между потоками, но такой подход сильно затруднит разработку приложения, а его сопровождение превратится в кошмар. Именно для таких случаев и существует модуль
Совместное использование уникальных указателей с доступом только на чтение
Сначала стоит разобраться с самым простым случаем – совместным доступом к неизменяемым данным из нескольких задач. Для этого необходимо воспользоваться модулем
let shared_data = arc::clone(&data); // (2)
do task::spawn {
let val = arc::get(&shared_data); // (3)
io::println(fmt!("Value: %?", val[1]));
}
io::println(fmt!("Value: %?", *arc::get(&data))); // (4)
Несмотря на то, что в данном примере нет работы с потоками, он вполне достаточен для иллюстрации работы с
R/W доступ к уникальным указателям
Модуль
for 5.times {
let reader = data.clone(); // (2)
do task::spawn {
do reader.read() |data| { // (3)
io::println(fmt!("Value: %?", data)); // (4)
}
}
}
do spawn {
do data.write() |data| { // (5)
for data.each_mut |x| { *x = *x * 2 } // (6)
}
}
В приведенном выше примере, создается (1) массив обернутый в
pipes
В составе библиотеки core появился новый модуль pipes. На данный момент модуль производит ощущение очень сырого, ожидающего не мало изменений. Так что, описание работы с ним я оставлю на потом, когда появится уверенность в стабильности интерфейсов.