state-res: parallelize fetches within some loops
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
9c84a3be3e
commit
1a550585bf
@ -5,7 +5,7 @@ use std::{
|
|||||||
hash::Hash,
|
hash::Hash,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures_util::{future, stream, Future, StreamExt};
|
use futures_util::{future, stream, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
|
||||||
use js_int::{int, Int};
|
use js_int::{int, Int};
|
||||||
use ruma_common::{EventId, MilliSecondsSinceUnixEpoch, RoomVersionId};
|
use ruma_common::{EventId, MilliSecondsSinceUnixEpoch, RoomVersionId};
|
||||||
use ruma_events::{
|
use ruma_events::{
|
||||||
@ -30,7 +30,14 @@ pub use room_version::RoomVersion;
|
|||||||
pub use state_event::Event;
|
pub use state_event::Event;
|
||||||
|
|
||||||
/// A mapping of event type and state_key to some value `T`, usually an `EventId`.
|
/// A mapping of event type and state_key to some value `T`, usually an `EventId`.
|
||||||
pub type StateMap<T> = HashMap<(StateEventType, String), T>;
|
pub type StateMap<T> = HashMap<TypeStateKey, T>;
|
||||||
|
pub type StateMapItem<T> = (TypeStateKey, T);
|
||||||
|
pub type TypeStateKey = (StateEventType, String);
|
||||||
|
|
||||||
|
/// Limit the number of asynchronous fetch requests in-flight for any given operation. This is a
|
||||||
|
/// local maximum which could be multiplied over several macro-operations, therefor the total number
|
||||||
|
/// of requests demanded from the callbacks could be far greater.
|
||||||
|
const PARALLEL_FETCHES: usize = 16;
|
||||||
|
|
||||||
/// Resolve sets of state events as they come in.
|
/// Resolve sets of state events as they come in.
|
||||||
///
|
///
|
||||||
@ -63,10 +70,10 @@ pub async fn resolve<'a, E, SetIter, Fetch, FetchFut, Exists, ExistsFut>(
|
|||||||
where
|
where
|
||||||
Fetch: Fn(E::Id) -> FetchFut + Sync,
|
Fetch: Fn(E::Id) -> FetchFut + Sync,
|
||||||
FetchFut: Future<Output = Option<E>> + Send,
|
FetchFut: Future<Output = Option<E>> + Send,
|
||||||
Exists: Fn(E::Id) -> ExistsFut,
|
Exists: Fn(E::Id) -> ExistsFut + Sync,
|
||||||
ExistsFut: Future<Output = bool> + Send,
|
ExistsFut: Future<Output = bool> + Send,
|
||||||
SetIter: Iterator<Item = &'a StateMap<E::Id>> + Clone + Send,
|
SetIter: Iterator<Item = &'a StateMap<E::Id>> + Clone + Send,
|
||||||
E: Event + Send,
|
E: Event + Clone + Send + Sync,
|
||||||
E::Id: Borrow<EventId> + Send + Sync,
|
E::Id: Borrow<EventId> + Send + Sync,
|
||||||
for<'b> &'b E: Send,
|
for<'b> &'b E: Send,
|
||||||
{
|
{
|
||||||
@ -91,9 +98,11 @@ where
|
|||||||
|
|
||||||
// `all_conflicted` contains unique items
|
// `all_conflicted` contains unique items
|
||||||
// synapse says `full_set = {eid for eid in full_conflicted_set if eid in event_map}`
|
// synapse says `full_set = {eid for eid in full_conflicted_set if eid in event_map}`
|
||||||
let all_conflicted: HashSet<E::Id> = stream::iter(auth_chain_diff)
|
let all_conflicted: HashSet<_> = stream::iter(auth_chain_diff)
|
||||||
// Don't honor events we cannot "verify"
|
// Don't honor events we cannot "verify"
|
||||||
.filter(|id| event_exists(id.clone()))
|
.map(|id| event_exists(id.clone()).map(move |exists| (id, exists)))
|
||||||
|
.buffer_unordered(PARALLEL_FETCHES)
|
||||||
|
.filter_map(|(id, exists)| future::ready(exists.then_some(id.clone())))
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@ -104,10 +113,12 @@ where
|
|||||||
// this is now a check the caller of `resolve` must make.
|
// this is now a check the caller of `resolve` must make.
|
||||||
|
|
||||||
// Get only the control events with a state_key: "" or ban/kick event (sender != state_key)
|
// Get only the control events with a state_key: "" or ban/kick event (sender != state_key)
|
||||||
let control_events = stream::iter(all_conflicted.iter())
|
let control_events: Vec<_> = stream::iter(all_conflicted.iter())
|
||||||
.filter(|&id| is_power_event_id(id, &event_fetch))
|
.map(|id| is_power_event_id(id, &event_fetch).map(move |is| (id, is)))
|
||||||
.map(Clone::clone)
|
.buffer_unordered(PARALLEL_FETCHES)
|
||||||
.collect::<Vec<_>>()
|
.filter_map(|(id, is)| future::ready(is.then_some(id.clone())))
|
||||||
|
.collect()
|
||||||
|
.boxed()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Sort the control events based on power_level/clock/event_id and outgoing/incoming edges
|
// Sort the control events based on power_level/clock/event_id and outgoing/incoming edges
|
||||||
@ -209,9 +220,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a Vec of deduped EventIds that appear in some chains but not others.
|
/// Returns a Vec of deduped EventIds that appear in some chains but not others.
|
||||||
fn get_auth_chain_diff<Id>(auth_chain_sets: &Vec<HashSet<Id>>) -> impl Iterator<Item = Id>
|
fn get_auth_chain_diff<Id>(auth_chain_sets: &Vec<HashSet<Id>>) -> impl Iterator<Item = Id> + Send
|
||||||
where
|
where
|
||||||
Id: Clone + Eq + Hash,
|
Id: Clone + Eq + Hash + Send,
|
||||||
{
|
{
|
||||||
let num_sets = auth_chain_sets.len();
|
let num_sets = auth_chain_sets.len();
|
||||||
let mut id_counts: HashMap<Id, usize> = HashMap::new();
|
let mut id_counts: HashMap<Id, usize> = HashMap::new();
|
||||||
@ -246,28 +257,27 @@ where
|
|||||||
let mut graph = HashMap::new();
|
let mut graph = HashMap::new();
|
||||||
for event_id in events_to_sort {
|
for event_id in events_to_sort {
|
||||||
add_event_and_auth_chain_to_graph(&mut graph, event_id, auth_diff, fetch_event).await;
|
add_event_and_auth_chain_to_graph(&mut graph, event_id, auth_diff, fetch_event).await;
|
||||||
|
|
||||||
// TODO: if these functions are ever made async here
|
|
||||||
// is a good place to yield every once in a while so other
|
|
||||||
// tasks can make progress
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is used in the `key_fn` passed to the lexico_topo_sort fn
|
// This is used in the `key_fn` passed to the lexico_topo_sort fn
|
||||||
let mut event_to_pl = HashMap::new();
|
let event_to_pl = stream::iter(graph.keys())
|
||||||
for event_id in graph.keys() {
|
.map(|event_id| {
|
||||||
let pl = get_power_level_for_sender(event_id, fetch_event).await?;
|
get_power_level_for_sender(event_id.clone(), fetch_event)
|
||||||
debug!(
|
.map(move |res| res.map(|pl| (event_id, pl)))
|
||||||
event_id = event_id.borrow().as_str(),
|
})
|
||||||
power_level = i64::from(pl),
|
.buffer_unordered(PARALLEL_FETCHES)
|
||||||
"found the power level of an event's sender",
|
.try_fold(HashMap::new(), |mut event_to_pl, (event_id, pl)| {
|
||||||
);
|
debug!(
|
||||||
|
event_id = event_id.borrow().as_str(),
|
||||||
|
power_level = i64::from(pl),
|
||||||
|
"found the power level of an event's sender",
|
||||||
|
);
|
||||||
|
|
||||||
event_to_pl.insert(event_id.clone(), pl);
|
event_to_pl.insert(event_id.clone(), pl);
|
||||||
|
future::ok(event_to_pl)
|
||||||
// TODO: if these functions are ever made async here
|
})
|
||||||
// is a good place to yield every once in a while so other
|
.boxed()
|
||||||
// tasks can make progress
|
.await?;
|
||||||
}
|
|
||||||
|
|
||||||
let event_to_pl = &event_to_pl;
|
let event_to_pl = &event_to_pl;
|
||||||
let fetcher = |event_id: E::Id| async move {
|
let fetcher = |event_id: E::Id| async move {
|
||||||
@ -289,7 +299,7 @@ pub async fn lexicographical_topological_sort<Id, F, Fut>(
|
|||||||
key_fn: &F,
|
key_fn: &F,
|
||||||
) -> Result<Vec<Id>>
|
) -> Result<Vec<Id>>
|
||||||
where
|
where
|
||||||
F: Fn(Id) -> Fut,
|
F: Fn(Id) -> Fut + Sync,
|
||||||
Fut: Future<Output = Result<(Int, MilliSecondsSinceUnixEpoch)>> + Send,
|
Fut: Future<Output = Result<(Int, MilliSecondsSinceUnixEpoch)>> + Send,
|
||||||
Id: Borrow<EventId> + Clone + Eq + Hash + Ord + Send,
|
Id: Borrow<EventId> + Clone + Eq + Hash + Ord + Send,
|
||||||
{
|
{
|
||||||
@ -402,11 +412,11 @@ where
|
|||||||
/// at the eventId's generation (we walk backwards to `EventId`s most recent previous power level
|
/// at the eventId's generation (we walk backwards to `EventId`s most recent previous power level
|
||||||
/// event).
|
/// event).
|
||||||
async fn get_power_level_for_sender<E, F, Fut>(
|
async fn get_power_level_for_sender<E, F, Fut>(
|
||||||
event_id: &E::Id,
|
event_id: E::Id,
|
||||||
fetch_event: &F,
|
fetch_event: &F,
|
||||||
) -> serde_json::Result<Int>
|
) -> serde_json::Result<Int>
|
||||||
where
|
where
|
||||||
F: Fn(E::Id) -> Fut,
|
F: Fn(E::Id) -> Fut + Sync,
|
||||||
Fut: Future<Output = Option<E>> + Send,
|
Fut: Future<Output = Option<E>> + Send,
|
||||||
E: Event + Send,
|
E: Event + Send,
|
||||||
E::Id: Borrow<EventId> + Send,
|
E::Id: Borrow<EventId> + Send,
|
||||||
@ -456,9 +466,9 @@ async fn iterative_auth_check<E, F, Fut>(
|
|||||||
fetch_event: &F,
|
fetch_event: &F,
|
||||||
) -> Result<StateMap<E::Id>>
|
) -> Result<StateMap<E::Id>>
|
||||||
where
|
where
|
||||||
F: Fn(E::Id) -> Fut,
|
F: Fn(E::Id) -> Fut + Sync,
|
||||||
Fut: Future<Output = Option<E>> + Send,
|
Fut: Future<Output = Option<E>> + Send,
|
||||||
E: Event + Send,
|
E: Event + Send + Sync,
|
||||||
E::Id: Borrow<EventId> + Clone + Send,
|
E::Id: Borrow<EventId> + Clone + Send,
|
||||||
for<'a> &'a E: Send,
|
for<'a> &'a E: Send,
|
||||||
{
|
{
|
||||||
@ -479,8 +489,8 @@ where
|
|||||||
let mut auth_events = StateMap::new();
|
let mut auth_events = StateMap::new();
|
||||||
for aid in event.auth_events() {
|
for aid in event.auth_events() {
|
||||||
if let Some(ev) = fetch_event(aid.clone()).await {
|
if let Some(ev) = fetch_event(aid.clone()).await {
|
||||||
// TODO synapse check "rejected_reason" which is most likely
|
//TODO: synapse checks "rejected_reason" which is most likely related to
|
||||||
// related to soft-failing
|
// soft-failing
|
||||||
auth_events.insert(
|
auth_events.insert(
|
||||||
ev.event_type().with_state_key(ev.state_key().ok_or_else(|| {
|
ev.event_type().with_state_key(ev.state_key().ok_or_else(|| {
|
||||||
Error::InvalidPdu("State event had no state key".to_owned())
|
Error::InvalidPdu("State event had no state key".to_owned())
|
||||||
@ -492,19 +502,26 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for key in auth_types_for_event(
|
let auth_types = auth_types_for_event(
|
||||||
event.event_type(),
|
event.event_type(),
|
||||||
event.sender(),
|
event.sender(),
|
||||||
Some(state_key),
|
Some(state_key),
|
||||||
event.content(),
|
event.content(),
|
||||||
)? {
|
)?;
|
||||||
if let Some(ev_id) = resolved_state.get(&key) {
|
|
||||||
if let Some(event) = fetch_event(ev_id.clone()).await {
|
let auth_types =
|
||||||
// TODO synapse checks `rejected_reason` is None here
|
auth_types.iter().filter_map(|key| Some((key, resolved_state.get(key)?))).into_iter();
|
||||||
auth_events.insert(key.to_owned(), event);
|
|
||||||
}
|
stream::iter(auth_types)
|
||||||
}
|
.filter_map(|(key, ev_id)| {
|
||||||
}
|
fetch_event(ev_id.clone()).map(move |event| event.map(|event| (key, event)))
|
||||||
|
})
|
||||||
|
.for_each(|(key, event)| {
|
||||||
|
//TODO: synapse checks "rejected_reason" is None here
|
||||||
|
auth_events.insert(key.to_owned(), event);
|
||||||
|
future::ready(())
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
debug!("event to check {:?}", event.event_id());
|
debug!("event to check {:?}", event.event_id());
|
||||||
|
|
||||||
@ -525,11 +542,8 @@ where
|
|||||||
// synapse passes here on AuthError. We do not add this event to resolved_state.
|
// synapse passes here on AuthError. We do not add this event to resolved_state.
|
||||||
warn!("event {event_id} failed the authentication check");
|
warn!("event {event_id} failed the authentication check");
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: if these functions are ever made async here
|
|
||||||
// is a good place to yield every once in a while so other
|
|
||||||
// tasks can make progress
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(resolved_state)
|
Ok(resolved_state)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -546,10 +560,10 @@ async fn mainline_sort<E, F, Fut>(
|
|||||||
fetch_event: &F,
|
fetch_event: &F,
|
||||||
) -> Result<Vec<E::Id>>
|
) -> Result<Vec<E::Id>>
|
||||||
where
|
where
|
||||||
F: Fn(E::Id) -> Fut,
|
F: Fn(E::Id) -> Fut + Sync,
|
||||||
Fut: Future<Output = Option<E>> + Send,
|
Fut: Future<Output = Option<E>> + Send,
|
||||||
E: Event + Send,
|
E: Event + Clone + Send + Sync,
|
||||||
E::Id: Borrow<EventId> + Clone + Send,
|
E::Id: Borrow<EventId> + Clone + Send + Sync,
|
||||||
{
|
{
|
||||||
debug!("mainline sort of events");
|
debug!("mainline sort of events");
|
||||||
|
|
||||||
@ -576,9 +590,6 @@ where
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: if these functions are ever made async here
|
|
||||||
// is a good place to yield every once in a while so other
|
|
||||||
// tasks can make progress
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mainline_map = mainline
|
let mainline_map = mainline
|
||||||
@ -588,25 +599,23 @@ where
|
|||||||
.map(|(idx, eid)| ((*eid).clone(), idx))
|
.map(|(idx, eid)| ((*eid).clone(), idx))
|
||||||
.collect::<HashMap<_, _>>();
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
let mut order_map = HashMap::new();
|
let order_map = stream::iter(to_sort.into_iter())
|
||||||
for ev_id in to_sort.iter() {
|
.map(|ev_id| fetch_event(ev_id.clone()).map(move |event| event.map(|event| (event, ev_id))))
|
||||||
if let Some(event) = fetch_event(ev_id.clone()).await {
|
.buffer_unordered(PARALLEL_FETCHES)
|
||||||
if let Ok(depth) = get_mainline_depth(Some(event), &mainline_map, fetch_event).await {
|
.filter_map(|result| future::ready(result))
|
||||||
order_map.insert(
|
.map(|(event, ev_id)| {
|
||||||
ev_id,
|
get_mainline_depth(Some(event.clone()), &mainline_map, fetch_event)
|
||||||
(
|
.map_ok(move |depth| (depth, event, ev_id))
|
||||||
depth,
|
.map(Result::ok)
|
||||||
fetch_event(ev_id.clone()).await.map(|ev| ev.origin_server_ts()),
|
})
|
||||||
ev_id,
|
.buffer_unordered(PARALLEL_FETCHES)
|
||||||
),
|
.filter_map(|result| future::ready(result))
|
||||||
);
|
.fold(HashMap::new(), |mut order_map, (depth, event, ev_id)| {
|
||||||
}
|
order_map.insert(ev_id, (depth, event.origin_server_ts(), ev_id));
|
||||||
}
|
future::ready(order_map)
|
||||||
|
})
|
||||||
// TODO: if these functions are ever made async here
|
.boxed()
|
||||||
// is a good place to yield every once in a while so other
|
.await;
|
||||||
// tasks can make progress
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sort the event_ids by their depth, timestamp and EventId
|
// Sort the event_ids by their depth, timestamp and EventId
|
||||||
// unwrap is OK order map and sort_event_ids are from to_sort (the same Vec)
|
// unwrap is OK order map and sort_event_ids are from to_sort (the same Vec)
|
||||||
@ -624,7 +633,7 @@ async fn get_mainline_depth<E, F, Fut>(
|
|||||||
fetch_event: &F,
|
fetch_event: &F,
|
||||||
) -> Result<usize>
|
) -> Result<usize>
|
||||||
where
|
where
|
||||||
F: Fn(E::Id) -> Fut,
|
F: Fn(E::Id) -> Fut + Sync,
|
||||||
Fut: Future<Output = Option<E>> + Send,
|
Fut: Future<Output = Option<E>> + Send,
|
||||||
E: Event + Send,
|
E: Event + Send,
|
||||||
E::Id: Borrow<EventId> + Send,
|
E::Id: Borrow<EventId> + Send,
|
||||||
@ -665,10 +674,11 @@ async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
|
|||||||
let mut state = vec![event_id];
|
let mut state = vec![event_id];
|
||||||
while let Some(eid) = state.pop() {
|
while let Some(eid) = state.pop() {
|
||||||
graph.entry(eid.clone()).or_default();
|
graph.entry(eid.clone()).or_default();
|
||||||
|
let event = fetch_event(eid.clone()).await;
|
||||||
|
let auth_events = event.as_ref().map(|ev| ev.auth_events()).into_iter().flatten();
|
||||||
|
|
||||||
// Prefer the store to event as the store filters dedups the events
|
// Prefer the store to event as the store filters dedups the events
|
||||||
for aid in
|
for aid in auth_events {
|
||||||
fetch_event(eid.clone()).await.as_ref().map(|ev| ev.auth_events()).into_iter().flatten()
|
|
||||||
{
|
|
||||||
if auth_diff.contains(aid.borrow()) {
|
if auth_diff.contains(aid.borrow()) {
|
||||||
if !graph.contains_key(aid.borrow()) {
|
if !graph.contains_key(aid.borrow()) {
|
||||||
state.push(aid.to_owned());
|
state.push(aid.to_owned());
|
||||||
@ -683,7 +693,7 @@ async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
|
|||||||
|
|
||||||
async fn is_power_event_id<E, F, Fut>(event_id: &E::Id, fetch: &F) -> bool
|
async fn is_power_event_id<E, F, Fut>(event_id: &E::Id, fetch: &F) -> bool
|
||||||
where
|
where
|
||||||
F: Fn(E::Id) -> Fut,
|
F: Fn(E::Id) -> Fut + Sync,
|
||||||
Fut: Future<Output = Option<E>> + Send,
|
Fut: Future<Output = Option<E>> + Send,
|
||||||
E: Event + Send,
|
E: Event + Send,
|
||||||
E::Id: Borrow<EventId> + Send,
|
E::Id: Borrow<EventId> + Send,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user