diff --git a/src/lib.rs b/src/lib.rs index 72eefd2e..5b4c1034 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ use std::rc::Rc; use std::str::FromStr; use futures::future::{Future, FutureFrom, IntoFuture}; +use futures::stream::{self, Stream}; use hyper::{Client as HyperClient, Uri}; use hyper::client::{Connect, HttpConnector}; #[cfg(feature = "hyper-tls")] @@ -179,6 +180,45 @@ where }) } + /// Convenience method that represents repeated calls to the sync_events endpoint as a stream. + /// + /// If the since parameter is None, the first Item might take a significant time to arrive and + /// be deserialized, because it contains all events that have occured in the whole lifetime of + /// the logged-in users account and are visible to them. + pub fn sync( + &self, + filter: Option, + since: Option, + set_presence: bool, + ) -> impl Stream { + use api::r0::sync::sync_events; + + let client = self.clone(); + let set_presence = if set_presence { + None + } else { + Some(sync_events::SetPresence::Offline) + }; + + stream::unfold(since, move |since| { + Some( + sync_events::call( + client.clone(), + sync_events::Request { + filter: filter.clone(), + since, + full_state: None, + set_presence: set_presence.clone(), + timeout: None, + }, + ).map(|res| { + let next_batch_clone = res.next_batch.clone(); + (res, Some(next_batch_clone)) + }) + ) + }) + } + /// Makes a request to a Matrix API endpoint. pub(crate) fn request( self,