Практическое руководство по Rust. Бонус
Arc — атомарный счетчик ссылок на T
: обрабатывает передачу между потоками и освобождает T
при уничтожении последней ссылки на нееMutex — обеспечивает взаимоисключающий доступ к значению T
Arc
Arc<T>
предоставляет общий доступ только для чтения к T
через Arc::clone()
:
use std::sync::Arc;
use std::thread;
fn main() {
let v = Arc::new(vec![10, 20, 30]);
let mut handles = Vec::new();
for _ in 1..5 {
let v = Arc::clone(&v);
handles.push(thread::spawn(move || {
let thread_id = thread::current().id();
println!("{thread_id:?}: {v:?}");
}));
}
handles.into_iter().for_each(|h| h.join().unwrap());
println!("v: {v:?}");
}
Ремарки:
Arc
означает Atomic Reference Counter (атомарный счетчик ссылок) и является потокобезопасной версиейRc
, в которой используются атомарные операцииArc<T>
реализуетClone
независимо от того, делает ли этоT
. Он реализуетSend
иSync
, только еслиT
их реализуетArc::clone()
имеет некоторую цену за счет выполнения атомарных операций, но после этого использованиеT
является бесплатным- остерегайтесь ссылочных циклов,
Arc
не использует сборщик мусора для их обнаружения
- в этом может помочь
std::sync::Weak
- в этом может помочь
Mutex
Mutex<T>
обеспечивает взаимное исключение и предоставляет мутабельный доступ к T
через доступный только для чтения интерфейс (форма внутренней изменчивости):
use std::sync::Mutex;
fn main() {
let v = Mutex::new(vec![10, 20, 30]);
println!("v: {:?}", v.lock().unwrap());
{
let mut guard = v.lock().unwrap();
guard.push(40);
}
println!("v: {:?}", v.lock().unwrap());
}
Обратите внимание на неявную реализацию (impl<T: Send> Sync for Mutex<T>
)https://doc.rust-lang.org/std/sync/struct.Mutex.html#impl-Sync-for-Mutex%3CT%3E.
Ремарки:
Mutex
вRust
похож на коллекцию, состоящую из одного элемента — защищенных данных
- невозможно забыть получить (acquire) мьютекс перед доступом к защищенным данным
- из
&Mutex<T>
через блокировку (lock) можно получить&mut T
.MutexGuard
гарантирует, что&mut T
не живет дольше удерживаемой (held) блокировки Mutex<T>
реализуетSend
иSync
, только еслиT
реализуетSend
RwLock
является блокировкой, доступной как для чтения, так и для записи- почему
lock()
возвращаетResult
?
- Если поток, в котором находится мьютекс, паникует, мьютекс становится "отравленным" (poisoned), сигнализируя о том, что защищенные данные могут находиться в несогласованном состоянии. Вызов
lock()
на отравленном мьютексе проваливается с PoisonError. Для восстановления данных можно вызватьinto_iter()
на ошибке
- Если поток, в котором находится мьютекс, паникует, мьютекс становится "отравленным" (poisoned), сигнализируя о том, что защищенные данные могут находиться в несогласованном состоянии. Вызов
Пример
Рассмотрим Arc
и Mutex
в действии:
use std::thread;
// use std::sync::{Arc, Mutex};
fn main() {
let v = vec![10, 20, 30];
let handle = thread::spawn(|| {
v.push(10);
});
v.push(1000);
handle.join().unwrap();
println!("v: {v:?}");
}
Возможное решение:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let v = Arc::new(Mutex::new(vec![10, 20, 30]));
let v2 = Arc::clone(&v);
let handle = thread::spawn(move || {
let mut v2 = v2.lock().unwrap();
v2.push(10);
});
{
let mut v = v.lock().unwrap();
v.push(1000);
}
handle.join().unwrap();
println!("v: {v:?}");
}
Ремарки:
v
обернут вArc
иMutex
, поскольку их зоны ответственности ортогональны
- оборачивание
Mutex
вArc
является распространенным паттерном для передачи мутабельного состояния между потоками
- оборачивание
v: Arc<_>
должен быть клонирован какv2
для передачи в другой поток. Обратите внимание наmove
в сигнатуре лямбды- блоки предназначены для максимального сужения области видимости
LockGuard
Упражнения
Попрактикуемся применять новые знания на двух упражнениях:
- обедающие философы — классическая задача параллелизма
- многопоточная проверка ссылок
Обедающие философы
Условия задачи:
Пять безмолвных философов сидят вокруг круглого стола, перед каждым философом стоит тарелка спагетти. На столе между каждой парой ближайших философов лежит по одной вилке.
Каждый философ может либо есть, либо размышлять. Прием пищи не ограничен количеством оставшихся спагетти — подразумевается бесконечный запас. Тем не менее, философ может есть только тогда, когда держит две вилки — взятую справа и слева.
Каждый философ может взять ближайшую вилку (если она доступна) или положить — если он уже держит ее. Взятие каждой вилки и возвращение ее на стол являются раздельными действиями, которые должны выполняться одно за другим.
Задача заключается в том, чтобы разработать модель (параллельный алгоритм), при которой ни один из философов не будет голодать, то есть будет чередовать прием пищи и размышления 100 раз.
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Fork;
struct Philosopher {
name: String,
// left_fork: ...
// right_fork: ...
// thoughts: ...
}
impl Philosopher {
fn think(&self) {
self.thoughts
.send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name))
.unwrap();
}
fn eat(&self) {
// Берем вилки
println!("{} ест...", &self.name);
thread::sleep(Duration::from_millis(10));
}
}
static PHILOSOPHERS: &[&str] =
&["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"];
fn main() {
// Создаем вилки
// Создаем философов
// Каждый философ размышляет и ест 100 раз
// Выводим размышления философов
}
Подсказка: рассмотрите возможность использования std::mem::swap для решения проблемы взаимной блокировки (deadlock).
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Fork;
struct Philosopher {
name: String,
left_fork: Arc<Mutex<Fork>>,
right_fork: Arc<Mutex<Fork>>,
thoughts: mpsc::SyncSender<String>,
}
impl Philosopher {
fn think(&self) {
self.thoughts
.send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name))
.unwrap();
}
fn eat(&self) {
println!("{} пытается есть", &self.name);
let _left = self.left_fork.lock().unwrap();
let _right = self.right_fork.lock().unwrap();
println!("{} ест...", &self.name);
thread::sleep(Duration::from_millis(10));
}
}
static PHILOSOPHERS: &[&str] =
&["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"];
fn main() {
let (tx, rx) = mpsc::sync_channel(10);
let forks = (0..PHILOSOPHERS.len())
.map(|_| Arc::new(Mutex::new(Fork)))
.collect::<Vec<_>>();
for i in 0..forks.len() {
let tx = tx.clone();
let mut left_fork = Arc::clone(&forks[i]);
let mut right_fork = Arc::clone(&forks[(i + 1) % forks.len()]);
// Во избежание взаимной блокировки нам необходимо где-то нарушить симметрию.
// Меняем вилки местами без их повторной инициализации
if i == forks.len() - 1 {
std::mem::swap(&mut left_fork, &mut right_fork);
}
let philosopher = Philosopher {
name: PHILOSOPHERS[i].to_string(),
thoughts: tx,
left_fork,
right_fork,
};
thread::spawn(move || {
for _ in 0..100 {
philosopher.eat();
philosopher.think();
}
});
}
drop(tx);
for thought in rx {
println!("{thought}");
}
}
Многопоточная проверка ссылок
Создадим инструмент для многопоточной проверки ссылок. Он должен начинать с основной веб-страницы и проверять корректность ссылок на ней. Затем он должен рекурсивно проверять другие страницы в том же домене и продолжать делать это до тех пор, пока все страницы не будут проверены.
Для создания такого инструмента вам потребуется какой-нибудь клиент HTTP
, например, reqwest:
cargo add reqwest --features blocking,rustls-tls
Для обнаружения ссылок можно воспользоваться scraper:
cargo add scraper
Наконец, для обработки ошибок пригодится thiserror:
cargo add thiserror
Cargo.toml
:
[package]
name = "link-checker"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
reqwest = { version = "0.11.12", features = ["blocking", "rustls-tls"] }
scraper = "0.13.0"
thiserror = "1.0.37"
Начните с небольшого сайта, такого как https://www.google.org
.
src/main.rs
:
use reqwest::blocking::Client;
use reqwest::Url;
use scraper::{Html, Selector};
use thiserror::Error;
#[derive(Error, Debug)]
enum Error {
#[error("ошибка запроса: {0}")]
ReqwestError(#[from] reqwest::Error),
#[error("плохой ответ HTTP: {0}")]
BadResponse(String),
}
#[derive(Debug)]
struct CrawlCommand {
url: Url,
extract_links: bool,
}
fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
println!("проверка {:#}", command.url);
let response = client.get(command.url.clone()).send()?;
if !response.status().is_success() {
return Err(Error::BadResponse(response.status().to_string()));
}
let mut link_urls = Vec::new();
if !command.extract_links {
return Ok(link_urls);
}
let base_url = response.url().to_owned();
let body_text = response.text()?;
let document = Html::parse_document(&body_text);
let selector = Selector::parse("a").unwrap();
let href_values = document
.select(&selector)
.filter_map(|element| element.value().attr("href"));
for href in href_values {
match base_url.join(href) {
Ok(link_url) => {
link_urls.push(link_url);
}
Err(err) => {
println!("в {base_url:#} не поддается разбору {href:?}: {err}");
}
}
}
Ok(link_urls)
}
fn main() {
let client = Client::new();
let start_url = Url::parse("https://www.google.org").unwrap();
let crawl_command = CrawlCommand{ url: start_url, extract_links: true };
match visit_page(&client, &crawl_command) {
Ok(links) => println!("ссылки: {links:#?}"),
Err(err) => println!("невозможно извлечь ссылки: {err:#}"),
}
}
Задачи:
- используйте потоки для параллельной проверки ссылок: отправьте проверяемые URL-адреса в канал и позвольте нескольким потокам проверять URL-адреса параллельно
- реализуйте рекурсивное извлечение ссылок со всех страниц домена
www.google.org
. Установите верхний предел в 100 страниц или около того, чтобы сайт вас не заблокировал
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::collections::HashSet;
use reqwest::blocking::Client;
use reqwest::Url;
use scraper::{Html, Selector};
use thiserror::Error;
#[derive(Error, Debug)]
enum Error {
#[error("ошибка запроса: {0}")]
ReqwestError(#[from] reqwest::Error),
#[error("плохой ответ HTTP: {0}")]
BadResponse(String),
}
#[derive(Debug)]
struct CrawlCommand {
url: Url,
extract_links: bool,
}
fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
println!("проверка {:#}", command.url);
let response = client.get(command.url.clone()).send()?;
if !response.status().is_success() {
return Err(Error::BadResponse(response.status().to_string()));
}
let mut link_urls = Vec::new();
if !command.extract_links {
return Ok(link_urls);
}
let base_url = response.url().to_owned();
let body_text = response.text()?;
let document = Html::parse_document(&body_text);
let selector = Selector::parse("a").unwrap();
let href_values = document
.select(&selector)
.filter_map(|element| element.value().attr("href"));
for href in href_values {
match base_url.join(href) {
Ok(link_url) => {
link_urls.push(link_url);
}
Err(err) => {
println!("в {base_url:#} не поддается разбору {href:?}: {err}");
}
}
}
Ok(link_urls)
}
struct CrawlState {
domain: String,
visited_pages: HashSet<String>,
}
impl CrawlState {
fn new(start_url: &Url) -> CrawlState {
let mut visited_pages = HashSet::new();
visited_pages.insert(start_url.as_str().to_string());
CrawlState {
domain: start_url.domain().unwrap().to_string(),
visited_pages
}
}
/// Определяет, должны ли извлекаться ссылки на указанной странице
fn should_extract_links(&self, url: &Url) -> bool {
let Some(url_domain) = url.domain() else {
return false;
};
url_domain == self.domain
}
/// Помечает указанную страницу как посещенную,
/// возвращает `false`, если страница уже посещалась
fn mark_visited(&mut self, url: &Url) -> bool {
self.visited_pages.insert(url.as_str().to_string())
}
}
type CrawlResult = Result<Vec<Url>, (Url, Error)>;
fn spawn_crawler_threads(
command_receiver: mpsc::Receiver<CrawlCommand>,
result_sender: mpsc::Sender<CrawlResult>,
thread_count: u32,
) {
let command_receiver = Arc::new(Mutex::new(command_receiver));
for _ in 0..thread_count {
let result_sender = result_sender.clone();
let command_receiver = command_receiver.clone();
thread::spawn(move || {
let client = Client::new();
loop {
let command_result = {
let receiver_guard = command_receiver.lock().unwrap();
receiver_guard.recv()
};
let Ok(crawl_command) = command_result else {
// Отправитель уничтожен, команд больше не будет
break;
};
let crawl_result = match visit_page(&client, &crawl_command) {
Ok(link_urls) => Ok(link_urls),
Err(error) => Err((crawl_command.url, error)),
};
result_sender.send(crawl_result).unwrap();
}
});
}
}
fn control_crawl(
start_url: Url,
command_sender: mpsc::Sender<CrawlCommand>,
result_receiver: mpsc::Receiver<CrawlResult>,
) -> Vec<Url> {
let mut crawl_state = CrawlState::new(&start_url);
let start_command = CrawlCommand { url: start_url, extract_links: true };
command_sender.send(start_command).unwrap();
let mut pending_urls = 1;
let mut bad_urls = Vec::new();
while pending_urls > 0 {
let crawl_result = result_receiver.recv().unwrap();
pending_urls -= 1;
match crawl_result {
Ok(link_urls) => {
for url in link_urls {
if crawl_state.mark_visited(&url) {
let extract_links = crawl_state.should_extract_links(&url);
let crawl_command = CrawlCommand { url, extract_links };
command_sender.send(crawl_command).unwrap();
pending_urls += 1;
}
}
}
Err((url, error)) => {
bad_urls.push(url);
println!("при извлечении ссылок возникла ошибка: {:#}", error);
continue;
}
}
}
bad_urls
}
fn check_links(start_url: Url) -> Vec<Url> {
let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>();
let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>();
spawn_crawler_threads(command_receiver, result_sender, 16);
control_crawl(start_url, command_sender, result_receiver)
}
fn main() {
let start_url = reqwest::Url::parse("https://www.google.org").unwrap();
let bad_urls = check_links(start_url);
println!("плохие URL: {:#?}", bad_urls);
}
Асинхронный Rust
"Асинхронность" (async) — это модель параллелизма, в которой несколько задач выполняются одновременно. Каждая задача выполняется до тех пор, пока не завершится или не заблокируется, затем выполняется следующая (готовая к выполнению) задача и т.д. Такая модель позволяет выполнять большое количество задач с помощью небольшого числа потоков. Это связано с тем, что накладные расходы на выполнение каждой задачи обычно очень низкие, а операционные системы предоставляют примитивы для эффективного переключения между задачами.
Асинхронные операции Rust
основаны на фьючерсах (futures, от "future" — будущее), представляющих собой работу, которая может быть завершена в будущем. Фьючерсы "опрашиваются" (polled) до тех пор, пока не сообщат о завершении.
Фьючерсы опрашиваются асинхронной средой выполнения (async runtime). Доступно несколько таких сред. Одной из самых популярных является Tokio.
Сравнения:
- в
Python
используется похожая модель вasyncio
. Однако, его типFuture
основан на функциях обратного вызова (callbacks), а не на опросах. Асинхронные программыPython
должны выполняться в цикле, как и асинхронные программыRust
Promise
вJavaScript
похож на фьючерс, но также основан на колбэках. Среды выполнения реализует цикл событий (event loop), поэтому многие детали разрешения промиса являются скрытыми
Основы асинхронности
async/await
На высоком уровне асинхронный код Rust
выглядит очень похоже на обычный синхронный код:
use futures::executor::block_on;
async fn count_to(count: i32) {
for i in 1..=count {
println!("Значение счетчика: {i}!");
}
}
async fn async_main(count: i32) {
count_to(count).await;
}
fn main() {
block_on(async_main(10));
}
Ремарки:
- это упрощенный пример для демонстрации синтаксиса. В нем отсутствуют долгие операции или параллелизм
- какой тип возвращает асинхронная функция?
- используйте
let future: () = async_main(10);
вmain()
и посмотрите
- используйте
- ключевое слово
async
— это синтаксический сахар. Компилятор заменяет возвращаемый тип фьючерсом - мы не можем сделать функцию
main
асинхронной без предоставления компилятору дополнительных инструкций о том, как использовать возвращаемый фьючерс - для запуска асинхронного кода требуется исполнитель (executor).
block_on()
блокирует текущий поток до завершения фьючерса .await
асинхронно ждет завершения другой операции. В отличие отblock_on()
,await
не блокирует текущий потокawait
может использоваться только внутри асинхронной функции (или блока, о чем мы поговорим позже)
Фьючерсы
Future — это трейт, реализуемый объектами, представляющими операцию, которая пока не может быть завершена. Фьючерсы могут опрашиваться, poll()
возвращает Poll.
use std::pin::Pin;
use std::task::Context;
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
Асинхронная функция возвращает impl Future
. Также возможно (но редко применяется) реализовать Future
для собственных типов. Например, JoinHandle
, возвращаемый tokio::spawn
реализует Future
, что позволяет присоединяться (join) к нему.
Ключевое слово await
приостанавливает выполнение асинхронной функции, пока фьючерс не будет готов.
Ремарки:
- типы
Future
иPoll
реализованы в точности, как показано - мы не будем рассматривать
Pin
иContext
, поскольку не будет создавать новые асинхронные примитивы. Коротко:
Context
позволяет фьючерсу планировать повторный опрос при возникновении событияPin
гарантирует, что фьючерс не перемещается в памяти, поэтому ссылки в этом фьючерсе остаются валидными. Это необходимо, чтобы ссылки оставались валидными послеawait
Среда выполнения
Среда выполнения (runtime) предоставляет поддержку для асинхронного выполнения операций (reactor) и отвечает за выполнение фьючерсов (executor). Rust
не имеет "встроенной" среды выполнения, но доступно несколько вариантов:
- Tokio — производительный, с хорошим набором инструментов, таких как Hyper для
HTTP
или Tonic дляgRPC
- async-std — стремится быть "стандартной библиотекой для асинхронного кода" и включает стандартную среду выполнения в
async::task
- smol — простой и легковесный
Несколько больших приложений имеют собственные среды выполнения. Одним из таких приложений является Fuchsia.
Фьючерсы являются "инертными" в том смысле, что они ничего не делают, пока не будут опрошены исполнителем. Это отличается от промисов JS
, например, которые запускаются, даже если никогда не используются.
Tokio
Tokio
предоставляет:
- многопоточную среду выполнения для выполнения асинхронного кода
- асинхронную версию стандартной библиотеки
- большую экосистему библиотек
use tokio::time;
async fn count_to(count: i32) {
for i in 1..=count {
println!("Значение счетчика в задаче: {i}!");
time::sleep(time::Duration::from_millis(5)).await;
}
}
#[tokio::main]
async fn main() {
tokio::spawn(count_to(10));
for i in 1..5 {
println!("Значение счетчика в основной задаче: {i}");
time::sleep(time::Duration::from_millis(5)).await;
}
}
Ремарки:
- макрос
tokio::main
позволяет делать функциюmain
асинхронной - функция
spawn
создает новую параллельную "задачу" - обратите внимание:
spawn
принимаетFuture
, мы не вызывает.await
наcount_to()
- почему
count_to()
обычно не доходит до 10? Это пример отмены асинхронной операции.tokio::spawn()
возвращает обработчик (handle), который может заставить поток ждать его завершения - попробуйте заменить
tokio::spawn
наcount_to(10).await
- попробуйте добавить ожидание завершения задачи, возвращаемой
tokio::spawn()
Задачи
Rust
имеет систему задач (task system), которая является формой легковесного трейдинга (threading).
Задача имеет один верхнеуровневый фьючерс, который опрашивается исполнителем. Этот фьючерс может иметь несколько вложенных фьючерсов, которые он опрашивает методом poll
, что приблизительно соответствует стеку вызовов (call stack). Параллелизм внутри задачи возможен путем опроса нескольких дочерних фьючерсов, например, запуск таймера и выполнение операции ввода-вывода.
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
println!("запросы принимаются на порту {}", listener.local_addr()?.port());
loop {
let (mut socket, addr) = listener.accept().await?;
println!("запрос из {addr:?}");
tokio::spawn(async move {
socket.write_all(b"Кто ты?\n").await.expect("ошибка сокета");
let mut buf = vec![0; 1024];
let name_size = socket.read(&mut buf).await.expect("ошибка сокета");
let name = std::str::from_utf8(&buf[..name_size]).unwrap().trim();
let reply = format!("Привет, {name}!\n");
socket.write_all(reply.as_bytes()).await.expect("ошибка сокета");
});
}
}
- Перед нами
async
блок. Такие блоки похожи на замыкания, но не принимают параметров. Их возвращаемым значением является фьючерс, как уasync fn
- переделайте асинхронный блок в функцию и улучшите обработку ошибок с помощью оператора
?
Асинхронные каналы
Некоторые крейты поддерживают асинхронные каналы, например, tokio
:
use tokio::sync::mpsc::{self, Receiver};
async fn ping_handler(mut input: Receiver<()>) {
let mut count: usize = 0;
while let Some(_) = input.recv().await {
count += 1;
println!("получено {count} пингов");
}
println!("ping_handler завершен");
}
#[tokio::main]
async fn main() {
let (sender, receiver) = mpsc::channel(32);
let ping_handler_task = tokio::spawn(ping_handler(receiver));
for i in 0..10 {
sender.send(()).await.expect("провал отправки пинга");
println!("отправлено {} пингов", i + 1);
}
drop(sender);
ping_handler_task.await.expect("что-то пошло не так");
}
- измените размер канала на
3
и посмотрите, как это повлияет на выполнение - интерфейс асинхронных каналов аналогичен интерфейсу
sync
каналов, о которых мы говорили ранее - попробуйте удалить
std::mem::drop
. Что произойдет? Почему? - крейт Flume предоставляет каналы, которые реализуют как
sync
, так иasync send
, иrecv
. Это может быть полезным для сложных приложений с задачами обработки ввода-вывода и тяжелыми для ЦП задачами async
каналы могут быть использованы вместе с другимиfuture
для создания сложного потока управления (control flow)
Поток управления фьючерсов
Фьючерсы могут объединяться вместе для создания графов потоков параллельных вычислений. Мы уже видели задачи, которые функционируют как автономные потоки выполнения.
Join
Метод join_all
ждет, когда все фьючерсы будут готовы, и возвращает их результаты. Это похоже на Promise.all
в JavaScript
или asyncio.gather
в Python
.
use anyhow::Result;
use futures::future;
use reqwest;
use std::collections::HashMap;
async fn size_of_page(url: &str) -> Result<usize> {
let resp = reqwest::get(url).await?;
Ok(resp.text().await?.len())
}
#[tokio::main]
async fn main() {
let urls: [&str; 4] = [
"https://google.com",
"https://httpbin.org/ip",
"https://play.rust-lang.org/",
"BAD_URL",
];
let futures_iter = urls.into_iter().map(size_of_page);
let results = future::join_all(futures_iter).await;
let page_sizes_dict: HashMap<&str, Result<usize>> =
urls.into_iter().zip(results.into_iter()).collect();
println!("{:?}", page_sizes_dict);
}
Ремарки:
- для нескольких фьючерсов непересекающихся типов можно использовать
std::future::join!
но мы должны знать, сколько фьючерсов у нас будет во время компиляции. В настоящее времяjoin_all()
находится в крейтеfutures
, но скоро будет стабилизирован вstd::future
- риск соединения заключается в том, что какой-нибудь фьючерс может никогда не разрешиться, что приведет к остановке программы
- мы можем комбинировать
join_all()
сjoin!
, например, чтобы объединить все запросы к службеHTTP
, а также запрос к базе данных. Попробуйте добавитьtokio::time::sleep()
во фьючерс, используяfuture::join!
. Это не таймаут (для которого требуетсяselect!
, как описано в следующем разделе), а демонстрация работыjoin!
Select
Операция выбора (select) ждет готовности любого фьючерса из набора и реагирует на его результат. В JavaScript
это похоже на Promise.race
. В Python
это похоже на asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED)
.
Подобно оператору match
, тело select!
имеет несколько ветвей (arms), каждая из которых имеет форму pattern = future => statement
. Когда future
готов, его возвращаемое значение деструктурируется pattern
. Затем statement
запускается с итоговыми переменными. Результат statement
становится результатом макроса select!
.
use tokio::sync::mpsc::{self, Receiver};
use tokio::time::{sleep, Duration};
#[derive(Debug, PartialEq)]
enum Animal {
Cat { name: String },
Dog { name: String },
}
async fn first_animal_to_finish_race(
mut cat_rcv: Receiver<String>,
mut dog_rcv: Receiver<String>,
) -> Option<Animal> {
tokio::select! {
cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }),
dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? })
}
}
#[tokio::main]
async fn main() {
let (cat_sender, cat_receiver) = mpsc::channel(32);
let (dog_sender, dog_receiver) = mpsc::channel(32);
tokio::spawn(async move {
sleep(Duration::from_millis(500)).await;
cat_sender.send(String::from("Феликс")).await.expect("ошибка отправки имени кота");
});
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
dog_sender.send(String::from("Рекс")).await.expect("ошибка отправки имени собаки");
});
let winner = first_animal_to_finish_race(cat_receiver, dog_receiver)
.await
.expect("ошибка получения победителя");
println!("Победителем является {winner:?}");
}
Ремарки:
- в примере у нас имеется гонка между кошкой и собакой.
first_animal_to_finish_race()
"слушает" (listening) оба канала и возвращает первый по времени результат. Поскольку имя собаки прибывает через 50 мс, собака выигрывает у кошки, имя которой прибывает через 500 мс - в примере вместо
channel
можно использоватьoneshot
, поскольку предполагается однократный вызов методаsend
- попробуйте добавить к гонке дедлайн, демонстрируя выбор разных фьючерсов
- обратите внимание, что
select!
уничтожает не совпавшие ветви, что отменяет их фьючерсы.select!
легче всего использовать, когда каждое выполнение этого макроса создает новые фьючерсы
- альтернативой является передача
&mut future
вместо самого фьючерса, но это может привести к проблемам, о котором мы поговорим позже
- альтернативой является передача
Ловушки async/await
async/await
предоставляет удобную и эффективную абстракцию для параллельного асинхронного программирования. Однако модель async/await
в Rust
также имеет свои подводные камни и ловушки, о которых мы поговорим в этом разделе.
Блокировка исполнителя
Большинство асинхронных сред выполнения допускают одновременное выполнение только задач ввода-вывода. Это означает, что задачи блокировки ЦП будут блокировать исполнителя (executor) и препятствовать выполнению других задач. Простой обходной путь — использовать эквивалентные асинхронные методы там, где это возможно.
use futures::future::join_all;
use std::time::Instant;
async fn sleep_ms(start: &Instant, id: u64, duration_ms: u64) {
std::thread::sleep(std::time::Duration::from_millis(duration_ms));
println!(
"фьючерс {id} спал в течение {duration_ms} мс, завершился после {} мс",
start.elapsed().as_millis()
);
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
let start = Instant::now();
let sleep_futures = (1..=10).map(|t| sleep_ms(&start, t, t * 10));
join_all(sleep_futures).await;
}
Ремарки:
- запустите код и убедитесь, что переходы в режим сна происходят последовательно, а не одновременно
flavor = "current_thread"
помещает все задачи в один поток. Это делает эффект более очевидным, но рассмотренная ошибка присутствует и в многопоточной версии- замените
std::thread::sleep
наtokio::time::sleep
и дождитесь результата - другим решением может быть
tokio::task::spawn_blocking
, который порождает реальный поток и преобразует его дескриптор вfuture
, не блокируя исполнителя - о задачах не следует думать как о потоках ОС. Они не совпадают 1 к 1, и большинство исполнителей позволяют выполнять множество задач в одном потоке ОС. Это особенно проблематично при взаимодействии с другими библиотеками через
FFI
, где эта библиотека может зависеть от локального хранилища потоков или сопоставляться с конкретными потоками ОС (например,CUDA
). В таких ситуациях отдавайте предпочтениеtokio::task::spawn_blocking
- используйте синхронные мьютексы осторожно. Удержание мьютекса над
.await
может привести к блокировке другой задачи, которая может выполняться в том же потоке
Pin
Асинхронные блоки и функции возвращают типы, реализующие трейт Future
. Возвращаемый тип является результатом преобразования компилятора, который превращает локальные переменные в данные, хранящиеся внутри фьючерса.
Некоторые из этих переменных могут содержать указатели на другие локальные переменные. По этой причине фьючерсы никогда не должны перемещаться в другую ячейку памяти, поскольку это сделает такие указатели недействительными.
Чтобы предотвратить перемещение фьючерса в памяти, его можно опрашивать только через закрепленный (pinned) указатель. Pin
— это оболочка ссылки, которая запрещает все операции, которые могли бы переместить экземпляр, на который она указывает, в другую ячейку памяти.
use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{sleep, Duration};
// Рабочая единица. В данном случае она просто спит в течение определенного времени
// и отвечает сообщением в канал `respond_on`
#[derive(Debug)]
struct Work {
input: u32,
respond_on: oneshot::Sender<u32>,
}
// Воркер, который ищет работу в очереди (queue) и выполняет ее
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
let mut iterations = 0;
loop {
tokio::select! {
Some(work) = work_queue.recv() => {
sleep(Duration::from_millis(10)).await; // выполняем "работу"
work.respond_on
.send(work.input * 1000)
.expect("провал отправки ответа");
iterations += 1;
}
// TODO: сообщать о количестве итераций каждый 100 мс
}
}
}
// "Запрашиватель" (requester), который запрашивает работу и ждет ее выполнения
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
let (tx, rx) = oneshot::channel();
work_queue
.send(Work { input, respond_on: tx })
.await
.expect("провал отправки работы в очередь");
rx.await.expect("провал ожидания ответа")
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(10);
spawn(worker(rx));
for i in 0..100 {
let resp = do_work(&tx, i).await;
println!("результат работы для итерации {i}: {resp}");
}
}
Ремарки:
- в примере вы могли распознать шаблон актора (actor pattern). Акторы, как правило, вызывают
select!
в цикле - это обобщение нескольких предыдущих уроков, так что не торопитесь
- добавьте
_ = sleep(Duration::from_millis(100)) => { println!(..) }
вselect!
. Это никогда не выполнится. Почему? - теперь добавьте
timeout_fut
, содержащий этот фьючерс за пределамиloop
:
- добавьте
let mut timeout_fut = sleep(Duration::from_millis(100));
loop {
select! {
..,
_ = timeout_fut => { println!(..); },
}
}
- это также не будет работать. Изучите ошибки компилятора, добавьте
&mut
вtimeout_fut
вselect!
для решения проблемы перемещения, затем используйтеBox::pin
:
let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
loop {
select! {
..,
_ = &mut timeout_fut => { println!(..); },
}
}
- это компилируется, но по истечении тайм-аута на каждой итерации происходит
Poll::Ready
(для решения этой проблемы может помочь объединенный фьючерс). Обновите код, чтобы сбрасыватьtimeout_fut
каждый раз, когда он истекает
Box
выделяет память в куче. В некоторых случаяхstd::pin::pin!
— это тоже вариант, но его сложно использовать для фьючерса, которой переназначается- другая альтернатива — вообще не использовать
pin
, а создать другую задачу, которая будет отправляться в каналoneshot
каждые 100 мс - данные, содержащие указатели на себя, называются самоссылающимися (self-referential). Обычно средство проверки заимствований (borrow checker) в
Rust
предотвращает перемещение таких данных, поскольку ссылки не могут жить дольше данных, на которые они указывают. Однако преобразование кода для асинхронных блоков и функций не проверяется средством проверки заимствований Pin
— это обертка над ссылкой. Объект не может перемещаться с помощью закрепленного указателя. Однако он может перемещаться с помощью незакрепленного указателя- метод
poll
трейтаFuture
используетPin<&mut Self>
вместо&mut Self
для ссылки на экземпляр. Вот почему он может вызываться только на закрепленном указателе
Асинхронные трейты
Асинхронные методы в трейтах пока не поддерживаются в стабильной версии Rust
.
Крейт async_trait предоставляет макрос для решения этой задачи:
use async_trait::async_trait;
use std::time::Instant;
use tokio::time::{sleep, Duration};
#[async_trait]
trait Sleeper {
async fn sleep(&self);
}
struct FixedSleeper {
sleep_ms: u64,
}
#[async_trait]
impl Sleeper for FixedSleeper {
async fn sleep(&self) {
sleep(Duration::from_millis(self.sleep_ms)).await;
}
}
async fn run_all_sleepers_multiple_times(
sleepers: Vec<Box<dyn Sleeper>>,
n_times: usize,
) {
for _ in 0..n_times {
println!("running all sleepers..");
for sleeper in &sleepers {
let start = Instant::now();
sleeper.sleep().await;
println!("slept for {}ms", start.elapsed().as_millis());
}
}
}
#[tokio::main]
async fn main() {
let sleepers: Vec<Box<dyn Sleeper>> = vec![
Box::new(FixedSleeper { sleep_ms: 50 }),
Box::new(FixedSleeper { sleep_ms: 100 }),
];
run_all_sleepers_multiple_times(sleepers, 5).await;
}
Ремарки:
async_trait
прост в использовании, но учтите, что для работы он использует выделение памяти в куче. Это влечет издержки производительности- попробуйте создать новую "спящую" структуру, которая будет спать случайное время, и добавить ее в вектор
Отмена
Удаление фьючерса означает, что его больше никогда нельзя будет опросить. Это называется отменой (cancellation) и может произойти в любой момент ожидания. Необходимо позаботиться о том, чтобы система работала правильно даже в случае отмены фьючерса. Например, он не должен блокироваться или терять данные.
use std::io::{self, ErrorKind};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
struct LinesReader {
stream: DuplexStream,
}
impl LinesReader {
fn new(stream: DuplexStream) -> Self {
Self { stream }
}
async fn next(&mut self) -> io::Result<Option<String>> {
let mut bytes = Vec::new();
let mut buf = [0];
while self.stream.read(&mut buf[..]).await? != 0 {
bytes.push(buf[0]);
if buf[0] == b'\n' {
break;
}
}
if bytes.is_empty() {
return Ok(None);
}
let s = String::from_utf8(bytes)
.map_err(|_| io::Error::new(ErrorKind::InvalidData, "не UTF-8"))?;
Ok(Some(s))
}
}
async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
for b in source.bytes() {
dest.write_u8(b).await?;
tokio::time::sleep(Duration::from_millis(10)).await
}
Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let (client, server) = tokio::io::duplex(5);
let handle = tokio::spawn(slow_copy("привет\nпривет\n".to_owned(), client));
let mut lines = LinesReader::new(server);
let mut interval = tokio::time::interval(Duration::from_millis(60));
loop {
tokio::select! {
_ = interval.tick() => println!("тик!"),
line = lines.next() => if let Some(l) = line? {
print!("{}", l)
} else {
break
},
}
}
handle.await.unwrap()?;
Ok(())
}
Ремарки:
- компилятор не помогает с обеспечением безопасности отмены. Необходимо читать документацию API и понимать, каким состоянием владеет ваша
async fn
- в отличие от
panic!
и?
, отмена — это часть нормального управления потоком выполнения (а не обработка ошибок) - в примере теряется часть строки
- если ветвь
tick()
выполняется первой,next()
и егоbuf
уничтожаются LinesReader
можно сделать безопасным для отмены путем включенияbuf
в структуру:
- если ветвь
struct LinesReader {
stream: DuplexStream,
bytes: Vec<u8>,
buf: [u8; 1],
}
impl LinesReader {
fn new(stream: DuplexStream) -> Self {
Self { stream, bytes: Vec::new(), buf: [0] }
}
async fn next(&mut self) -> io::Result<Option<String>> {
// ...
let raw = std::mem::take(&mut self.bytes);
let s = String::from_utf8(raw)
// ...
}
}
- Interval::tick безопасен для отмены, поскольку он отслеживает, был ли "доставлен" (delivered) тик
- AsyncReadExt::read безопасен для отмены, поскольку он либо возвращается, либо не читает данные
- AsyncBufReadExt::read_line, как и пример, не является безопасным для отмены. Подробности и альтернативы см. в документации
Упражнения
Для тренировки навыков работы с асинхронным Rust
, есть еще два упражнения:
- обедающие философы — на этот раз вам нужно решить эту задачу с помощью асинхронного
Rust
- приложение для чата
Обедающие философы
use std::sync::Arc;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::Mutex;
use tokio::time;
struct Fork;
struct Philosopher {
name: String,
// left_fork: ...
// right_fork: ...
// thoughts: ...
}
impl Philosopher {
async fn think(&self) {
self.thoughts
.send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name))
.await
.unwrap();
}
async fn eat(&self) {
// Пытаемся до тех пор, пока не получим обе вилки
println!("{} ест...", &self.name);
time::sleep(time::Duration::from_millis(5)).await;
}
}
static PHILOSOPHERS: &[&str] =
&["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"];
#[tokio::main]
async fn main() {
// Создаем вилки
// Создаем философов
// Каждый философ размышляет и ест 100 раз
// Выводим размышления философов
}
Для работы с асинхронным Rust
рекомендуется использовать tokio
:
[package]
name = "dining-philosophers-async"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.26.0", features = ["sync", "time", "macros", "rt-multi-thread"] }
Подсказка: на этот раз вам придется использовать Mutex
и модуль mpsc
из tokio
.
use std::sync::Arc;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::Mutex;
use tokio::time;
struct Fork;
struct Philosopher {
name: String,
left_fork: Arc<Mutex<Fork>>,
right_fork: Arc<Mutex<Fork>>,
thoughts: Sender<String>,
}
impl Philosopher {
async fn think(&self) {
self.thoughts
.send(format!("Эврика! {} сгенерировал(а) новую идею!", &self.name))
.await
.unwrap();
}
async fn eat(&self) {
// Пытаемся до тех пор, пока не получим обе вилки
let (_left_fork, _right_fork) = loop {
// Берем вилки...
let left_fork = self.left_fork.try_lock();
let right_fork = self.right_fork.try_lock();
let Ok(left_fork) = left_fork else {
// Если мы не получили левую вилку, удаляем правую вилку,
// если она у нас была, позволяя выполняться другим задачам
drop(right_fork);
time::sleep(time::Duration::from_millis(1)).await;
continue;
};
let Ok(right_fork) = right_fork else {
// Если мы не получили правую вилку, удаляем левую вилку,
// если она у нас была, позволяя выполняться другим задачам
drop(left_fork);
time::sleep(time::Duration::from_millis(1)).await;
continue;
};
break (left_fork, right_fork);
};
println!("{} ест...", &self.name);
time::sleep(time::Duration::from_millis(5)).await;
// Блокировки уничтожаются здесь
}
}
static PHILOSOPHERS: &[&str] =
&["Сократ", "Гипатия", "Платон", "Аристотель", "Пифагор"];
#[tokio::main]
async fn main() {
// Создаем вилки
let mut forks = vec![];
(0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork))));
// Создаем философов
let (philosophers, mut rx) = {
let mut philosophers = vec![];
let (tx, rx) = mpsc::channel(10);
for (i, name) in PHILOSOPHERS.iter().enumerate() {
let left_fork = Arc::clone(&forks[i]);
let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]);
philosophers.push(Philosopher {
name: name.to_string(),
left_fork,
right_fork,
thoughts: tx.clone(),
});
}
(philosophers, rx)
// `tx` уничтожается здесь, поэтому нам не нужно явно удалять его позже
};
// Каждый философ думает и ест 100 раз
for phil in philosophers {
tokio::spawn(async move {
for _ in 0..100 {
phil.think().await;
phil.eat().await;
}
});
}
// Выводим размышления философов
while let Some(thought) = rx.recv().await {
println!("{thought}");
}
}
Чат
В этом упражнении мы используем новые знания для разработки приложения чата. У нас есть сервер, к которому подключаются клиенты и в котором они публикуют свои сообщения. Клиент читает пользовательские сообщения через стандартный ввод и отправляет их на сервер. Сервер передает (broadcast) сообщение всем клиентам.
Для реализации этого функционала мы будем использовать широковещательный канал на сервере и tokio_websockets для взаимодействия между клиентом и сервером.
Создайте новый проект и добавьте следующие зависимости в Cargo.toml
:
[package]
name = "chat-async"
version = "0.1.0"
edition = "2021"
[dependencies]
futures-util = { version = "0.3.30", features = ["sink"] }
http = "1.0.0"
tokio = { version = "1.28.1", features = ["full"] }
tokio-websockets = { version = "0.5.1", features = ["client", "fastrand", "server", "sha1_smol"] }
Необходимые API
Вам потребуются следующие функции из tokio
и tokio_websockets
. Потратьте несколько минут для ознакомления со следующими API:
- StreamExt::next(), реализуемый
WebSocketStream
— для асинхронного чтения сообщений из потока веб-сокетов - SinkExt::send(), реализуемый
WebSocketStream
— для асинхронной отправки сообщений в поток веб-сокетов - Lines::next_line() — для асинхронного чтения сообщений пользователя через стандартный ввод
- Sender::subscribe() — для подписки на широковещательный канал
Два бинарника
Как правило, в проекте может быть только один исполняемый файл (binary) и один файл src/main.rs
. Нам требуется два бинарника. Один для клиента и еще один для сервера. Теоретически их можно сделать двумя отдельными проектами, но мы поместим оба бинарника в один проект. Для того, чтобы это работало, клиент и сервер должны находиться в директории src/bin
(см. документацию).
Скопируйте следующий код сервера и клиента в src/bin/server.rs
и src/bin/client.rs
, соответственно.
// src/bin/server.rs
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::error::Error;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
async fn handle_connection(
addr: SocketAddr,
mut ws_stream: WebSocketStream<TcpStream>,
bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
todo!("реализуй меня")
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let (bcast_tx, _) = channel(16);
let listener = TcpListener::bind("127.0.0.1:2000").await?;
println!("Запросы принимаются на порту 2000");
loop {
let (socket, addr) = listener.accept().await?;
println!("Запрос от {addr:?}");
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
// Оборачиваем сырой поток TCP в веб-сокет
let ws_stream = ServerBuilder::new().accept(socket).await?;
handle_connection(addr, ws_stream, bcast_tx).await
});
}
}
// src/bin/client.rs
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};
#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
let (mut ws_stream, _) =
ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
.connect()
.await?;
let stdin = tokio::io::stdin();
let mut stdin = BufReader::new(stdin).lines();
todo!("реализуй меня")
}
Запуск бинарников
Команда для запуска сервера:
cargo run --bin server
Команда для запуска клиента:
cargo run --bin client
Задачи
- реализовать функцию
handle_connection
вsrc/bin/server.rs
- подсказка: используйте
tokio::select!
для параллельного выполнения двух задач в бесконечном цикле. Одна задача получает сообщения от клиента и передает их другим клиентам. Другая — отправляет клиенту сообщения, полученные от сервера
- подсказка: используйте
- завершите функцию
main
вsrc/bin/client.rs
- подсказка: также используйте
tokio::select!
в бесконечном цикле для параллельного выполнения двух задач: 1) чтение сообщений пользователя из стандартного ввода и их отправка серверу; 2) получение сообщений от сервера и их отображение
- подсказка: также используйте
- опционально: измените код для передачи сообщений всем клиентам, кроме отправителя
// src/bin/server.rs
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::error::Error;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
async fn handle_connection(
addr: SocketAddr,
mut ws_stream: WebSocketStream<TcpStream>,
bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
ws_stream
.send(Message::text("Добро пожаловать в чат! Отправьте сообщение".to_string()))
.await?;
let mut bcast_rx = bcast_tx.subscribe();
// Бесконечный цикл для параллельного выполнения двух задач:
// 1) получение сообщений из `ws_stream` и их передача клиентам
// 2) получение сообщений в `bcast_rx` и их отправка клиенту
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if let Some(text) = msg.as_text() {
println!("{addr:?}: {text:?}");
bcast_tx.send(text.into())?;
}
}
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
msg = bcast_rx.recv() => {
ws_stream.send(Message::text(msg?)).await?;
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let (bcast_tx, _) = channel(16);
let listener = TcpListener::bind("127.0.0.1:2000").await?;
println!("Запросы принимаются на порту 2000");
loop {
let (socket, addr) = listener.accept().await?;
println!("Запрос от {addr:?}");
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
// Оборачиваем сырой поток TCP в веб-сокет
let ws_stream = ServerBuilder::new().accept(socket).await?;
handle_connection(addr, ws_stream, bcast_tx).await
});
}
}
// src/bin/client.rs
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};
#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
let (mut ws_stream, _) =
ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
.connect()
.await?;
let stdin = tokio::io::stdin();
let mut stdin = BufReader::new(stdin).lines();
// Бесконечный цикл для параллельной отправки и получения сообщений
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if let Some(text) = msg.as_text() {
println!("От сервера: {}", text);
}
},
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
res = stdin.next_line() => {
match res {
Ok(None) => return Ok(()),
Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?,
Err(err) => return Err(err.into()),
}
}
}
}
}
Это конец бонусной части руководства.
Материалы для более глубокого изучения рассмотренных тем:
- Книга/учебник по Rust (на русском языке) — главы 16 и 20
- rustlings — упражнение 20
- Rust на примерах (на русском языке) — примеры 20.1 и 20.2
Happy coding!
Новости, обзоры продуктов и конкурсы от команды Timeweb.Cloud — в нашем Telegram-канале ↩