Use stream::try_unfold from futures 0.3.2
This commit is contained in:
parent
e4cf0fbf1e
commit
aee5693fd8
34
src/lib.rs
34
src/lib.rs
@ -301,14 +301,6 @@ where
|
|||||||
+ TryStream<Ok = api::r0::sync::sync_events::IncomingResponse, Error = Error> {
|
+ TryStream<Ok = api::r0::sync::sync_events::IncomingResponse, Error = Error> {
|
||||||
use api::r0::sync::sync_events;
|
use api::r0::sync::sync_events;
|
||||||
|
|
||||||
// TODO: Is this really the way TryStreams are supposed to work?
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
|
||||||
enum State {
|
|
||||||
InitialSync,
|
|
||||||
Since(String),
|
|
||||||
Errored,
|
|
||||||
}
|
|
||||||
|
|
||||||
let client = self.clone();
|
let client = self.clone();
|
||||||
let set_presence = if set_presence {
|
let set_presence = if set_presence {
|
||||||
None
|
None
|
||||||
@ -316,23 +308,12 @@ where
|
|||||||
Some(sync_events::SetPresence::Offline)
|
Some(sync_events::SetPresence::Offline)
|
||||||
};
|
};
|
||||||
|
|
||||||
let initial_state = match since {
|
stream::try_unfold(since, move |since| {
|
||||||
Some(s) => State::Since(s),
|
|
||||||
None => State::InitialSync,
|
|
||||||
};
|
|
||||||
|
|
||||||
stream::unfold(initial_state, move |state| {
|
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
let filter = filter.clone();
|
let filter = filter.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let since = match state {
|
let response = client
|
||||||
State::Errored => return None,
|
|
||||||
State::Since(s) => Some(s),
|
|
||||||
State::InitialSync => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let res = client
|
|
||||||
.request(sync_events::Request {
|
.request(sync_events::Request {
|
||||||
filter,
|
filter,
|
||||||
since,
|
since,
|
||||||
@ -340,15 +321,10 @@ where
|
|||||||
set_presence,
|
set_presence,
|
||||||
timeout: None,
|
timeout: None,
|
||||||
})
|
})
|
||||||
.await;
|
.await?;
|
||||||
|
|
||||||
match res {
|
let next_batch_clone = response.next_batch.clone();
|
||||||
Ok(response) => {
|
Ok(Some((response, Some(next_batch_clone))))
|
||||||
let next_batch_clone = response.next_batch.clone();
|
|
||||||
Some((Ok(response), State::Since(next_batch_clone)))
|
|
||||||
}
|
|
||||||
Err(e) => Some((Err(e), State::Errored)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user