joke_bot: Introduce some concurrency

This commit is contained in:
Jonas Platte 2022-02-12 18:48:06 +01:00
parent 3da7e53156
commit 07ed3da86a
No known key found for this signature in database
GPG Key ID: CC154DE0E30B7C67
2 changed files with 19 additions and 7 deletions

View File

@ -6,13 +6,14 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
ruma = { version = "0.4.0", path = "../../crates/ruma", features = ["client-api-c", "client", "client-hyper-native-tls", "events"] }
ruma = { version = "0.4.0", path = "../../crates/ruma", features = ["client-api-c", "client", "client-hyper-native-tls", "events", "rand"] }
ruma-client = { version = "0.7.0", path = "../../crates/ruma-client", features = ["client-api"]}
# For building locally: use the git dependencies below.
# Browse the source at this revision here: https://github.com/ruma/ruma/tree/f161c8117c706fc52089999e1f406cf34276ec9d
# ruma = { git = "https://github.com/ruma/ruma", rev = "f161c8117c706fc52089999e1f406cf34276ec9d", features = ["client-api-c", "client", "client-hyper-native-tls", "events"] }
# ruma-client = { git = "https://github.com/ruma/ruma", rev = "f161c8117c706fc52089999e1f406cf34276ec9d", features = ["client-api"] }
futures-util = { version = "0.3.21", default-features = false, features = ["std"] }
http = "0.2.2"
hyper = "0.14.2"
hyper-tls = "0.5.0"

View File

@ -1,5 +1,6 @@
use std::{convert::TryInto, error::Error, io, process::exit, time::Duration};
use futures_util::future::{join, join_all};
use ruma::{
api::{
client::r0::{
@ -84,26 +85,36 @@ async fn run() -> Result<(), Box<dyn Error>> {
&PresenceState::Online,
Some(Duration::from_secs(30)),
));
// Prevent the clients being moved by `async move` blocks
let http_client = &http_client;
let matrix_client = &matrix_client;
println!("Listening...");
while let Some(response) = sync_stream.try_next().await? {
for (room_id, room_info) in response.rooms.join {
let message_futures = response.rooms.join.iter().map(|(room_id, room_info)| async move {
// Use a regular for loop for the messages within one room to handle them sequentially
for e in &room_info.timeline.events {
match handle_messages(&http_client, &matrix_client, e, &room_id, user_id).await {
match handle_messages(http_client, matrix_client, e, room_id, user_id).await {
Ok(_) => {}
Err(err) => {
eprintln!("failed to respond to message: {}", err)
}
}
}
}
});
for (room_id, _) in response.rooms.invite {
match handle_invitations(&http_client, &matrix_client, &room_id).await {
let invite_futures = response.rooms.invite.iter().map(|(room_id, _)| async move {
match handle_invitations(http_client, matrix_client, room_id).await {
Ok(_) => {}
Err(err) => eprintln!("failed to accept invitation for room {}: {}", &room_id, err),
}
}
});
// Handle messages from different rooms as well as invites concurrently
join(join_all(message_futures), join_all(invite_futures)).await;
}
Ok(())
}