From 07ed3da86a14c83e5a35c11db00000f0fb598443 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Sat, 12 Feb 2022 18:48:06 +0100 Subject: [PATCH] joke_bot: Introduce some concurrency --- examples/joke_bot/Cargo.toml | 3 ++- examples/joke_bot/src/main.rs | 23 +++++++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/examples/joke_bot/Cargo.toml b/examples/joke_bot/Cargo.toml index 8a6e9970..0c9392cb 100644 --- a/examples/joke_bot/Cargo.toml +++ b/examples/joke_bot/Cargo.toml @@ -6,13 +6,14 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -ruma = { version = "0.4.0", path = "../../crates/ruma", features = ["client-api-c", "client", "client-hyper-native-tls", "events"] } +ruma = { version = "0.4.0", path = "../../crates/ruma", features = ["client-api-c", "client", "client-hyper-native-tls", "events", "rand"] } ruma-client = { version = "0.7.0", path = "../../crates/ruma-client", features = ["client-api"]} # For building locally: use the git dependencies below. # Browse the source at this revision here: https://github.com/ruma/ruma/tree/f161c8117c706fc52089999e1f406cf34276ec9d # ruma = { git = "https://github.com/ruma/ruma", rev = "f161c8117c706fc52089999e1f406cf34276ec9d", features = ["client-api-c", "client", "client-hyper-native-tls", "events"] } # ruma-client = { git = "https://github.com/ruma/ruma", rev = "f161c8117c706fc52089999e1f406cf34276ec9d", features = ["client-api"] } +futures-util = { version = "0.3.21", default-features = false, features = ["std"] } http = "0.2.2" hyper = "0.14.2" hyper-tls = "0.5.0" diff --git a/examples/joke_bot/src/main.rs b/examples/joke_bot/src/main.rs index 82fea723..9badae8d 100644 --- a/examples/joke_bot/src/main.rs +++ b/examples/joke_bot/src/main.rs @@ -1,5 +1,6 @@ use std::{convert::TryInto, error::Error, io, process::exit, time::Duration}; +use futures_util::future::{join, join_all}; use ruma::{ api::{ client::r0::{ @@ -84,26 +85,36 @@ async fn run() -> Result<(), Box> { &PresenceState::Online, Some(Duration::from_secs(30)), )); + + // Prevent the clients being moved by `async move` blocks + let http_client = &http_client; + let matrix_client = &matrix_client; + println!("Listening..."); while let Some(response) = sync_stream.try_next().await? { - for (room_id, room_info) in response.rooms.join { + let message_futures = response.rooms.join.iter().map(|(room_id, room_info)| async move { + // Use a regular for loop for the messages within one room to handle them sequentially for e in &room_info.timeline.events { - match handle_messages(&http_client, &matrix_client, e, &room_id, user_id).await { + match handle_messages(http_client, matrix_client, e, room_id, user_id).await { Ok(_) => {} Err(err) => { eprintln!("failed to respond to message: {}", err) } } } - } + }); - for (room_id, _) in response.rooms.invite { - match handle_invitations(&http_client, &matrix_client, &room_id).await { + let invite_futures = response.rooms.invite.iter().map(|(room_id, _)| async move { + match handle_invitations(http_client, matrix_client, room_id).await { Ok(_) => {} Err(err) => eprintln!("failed to accept invitation for room {}: {}", &room_id, err), } - } + }); + + // Handle messages from different rooms as well as invites concurrently + join(join_all(message_futures), join_all(invite_futures)).await; } + Ok(()) }