client: Require a next_batch token for Client::sync

This commit is contained in:
Jonas Platte 2020-08-18 00:25:41 +02:00
parent 277800b980
commit a9757b13ae
No known key found for this signature in database
GPG Key ID: CC154DE0E30B7C67
2 changed files with 19 additions and 16 deletions

View File

@ -1,8 +1,10 @@
use std::{env, process::exit, time::Duration};
use futures_util::stream::{StreamExt as _, TryStreamExt as _};
use assign::assign;
use futures_util::stream::TryStreamExt as _;
use http::Uri;
use ruma::{
api::client::r0::{filter::FilterDefinition, sync::sync_events},
events::{
room::message::{MessageEventContent, TextMessageEventContent},
AnySyncMessageEvent, AnySyncRoomEvent, SyncMessageEvent,
@ -16,13 +18,18 @@ async fn log_messages(homeserver_url: Uri, username: &str, password: &str) -> an
client.log_in(username, password, None, None).await?;
let mut sync_stream = Box::pin(
client
.sync(None, None, PresenceState::Online, Some(Duration::from_secs(30)))
// TODO: This is a horrible way to obtain an initial next_batch token that generates way
// too much server load and network traffic. Fix this!
.skip(1),
);
let initial_sync_response = client
.request(assign!(sync_events::Request::new(), {
filter: Some(FilterDefinition::ignore_all().into()),
}))
.await?;
let mut sync_stream = Box::pin(client.sync(
None,
initial_sync_response.next_batch,
PresenceState::Online,
Some(Duration::from_secs(30)),
));
while let Some(res) = sync_stream.try_next().await? {
// Only look at rooms the user hasn't left yet

View File

@ -53,7 +53,7 @@
//! # async {
//! let mut sync_stream = Box::pin(client.sync(
//! None,
//! Some(next_batch_token),
//! next_batch_token,
//! PresenceState::Online,
//! Some(Duration::from_secs(30)),
//! ));
@ -315,14 +315,10 @@ where
}
/// Convenience method that represents repeated calls to the sync_events endpoint as a stream.
///
/// If the since parameter is None, the first Item might take a significant time to arrive and
/// be deserialized, because it contains all events that have occurred in the whole lifetime of
/// the logged-in users account and are visible to them.
pub fn sync<'a>(
&self,
filter: Option<SyncFilter<'a>>,
since: Option<String>,
since: String,
set_presence: ruma_common::presence::PresenceState,
timeout: Option<Duration>,
) -> impl Stream<Item = Result<SyncResponse, Error<ruma_client_api::Error>>>
@ -336,14 +332,14 @@ where
let response = client
.request(assign!(SyncRequest::new(), {
filter,
since: since.as_deref(),
since: Some(&since),
set_presence,
timeout,
}))
.await?;
let next_batch_clone = response.next_batch.clone();
Ok(Some((response, Some(next_batch_clone))))
Ok(Some((response, next_batch_clone)))
}
})
}