client: Replace futures_util::stream::try_unfold with async_stream::try_stream!

This commit is contained in:
Jonas Platte 2021-01-04 16:12:12 +01:00
parent 6f4d883e88
commit 1db0ad1bea
No known key found for this signature in database
GPG Key ID: CC154DE0E30B7C67
2 changed files with 11 additions and 14 deletions

View File

@ -17,8 +17,8 @@ version = "0.5.0-alpha.1"
[dependencies] [dependencies]
assign = "1.1.1" assign = "1.1.1"
async-stream = "0.3.0"
futures-core = "0.3.8" futures-core = "0.3.8"
futures-util = "0.3.8"
http = "0.2.2" http = "0.2.2"
hyper = { version = "0.14.2", features = ["client", "tcp"] } hyper = { version = "0.14.2", features = ["client", "tcp"] }
hyper-tls = { version = "0.5.0", optional = true } hyper-tls = { version = "0.5.0", optional = true }
@ -34,6 +34,7 @@ serde_json = "1.0.61"
[dev-dependencies] [dev-dependencies]
anyhow = "1.0.37" anyhow = "1.0.37"
futures-util = "0.3.8"
ruma = { version = "0.0.2", path = "../ruma", features = ["client-api"] } ruma = { version = "0.0.2", path = "../ruma", features = ["client-api"] }
tokio = { version = "1.0.1", features = ["macros", "rt"] } tokio = { version = "1.0.1", features = ["macros", "rt"] }

View File

@ -107,8 +107,8 @@ use std::{
}; };
use assign::assign; use assign::assign;
use futures_core::stream::{Stream, TryStream}; use async_stream::try_stream;
use futures_util::stream; use futures_core::stream::Stream;
use http::{uri::Uri, Response as HttpResponse}; use http::{uri::Uri, Response as HttpResponse};
use hyper::client::{Client as HyperClient, HttpConnector}; use hyper::client::{Client as HyperClient, HttpConnector};
use ruma_api::{AuthScheme, OutgoingRequest}; use ruma_api::{AuthScheme, OutgoingRequest};
@ -302,17 +302,13 @@ impl Client {
pub fn sync<'a>( pub fn sync<'a>(
&self, &self,
filter: Option<&'a SyncFilter<'a>>, filter: Option<&'a SyncFilter<'a>>,
since: String, mut since: String,
set_presence: &'a PresenceState, set_presence: &'a PresenceState,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> impl Stream<Item = Result<SyncResponse, Error<ruma_client_api::Error>>> ) -> impl Stream<Item = Result<SyncResponse, Error<ruma_client_api::Error>>> + 'a {
+ TryStream<Ok = SyncResponse, Error = Error<ruma_client_api::Error>>
+ 'a {
let client = self.clone(); let client = self.clone();
stream::try_unfold(since, move |since| { try_stream! {
let client = client.clone(); loop {
async move {
let response = client let response = client
.request(assign!(SyncRequest::new(), { .request(assign!(SyncRequest::new(), {
filter, filter,
@ -322,10 +318,10 @@ impl Client {
})) }))
.await?; .await?;
let next_batch_clone = response.next_batch.clone(); since = response.next_batch.clone();
Ok(Some((response, next_batch_clone))) yield response;
}
} }
})
} }
/// Makes a request to a Matrix API endpoint. /// Makes a request to a Matrix API endpoint.