state-res: Remove StateResolution type
… making its associated functions free instead.
This commit is contained in:
		
							parent
							
								
									cc9336e444
								
							
						
					
					
						commit
						4bea59caef
					
				| @ -30,7 +30,7 @@ use ruma_events::{ | ||||
|     EventType, | ||||
| }; | ||||
| use ruma_identifiers::{EventId, RoomId, RoomVersionId, UserId}; | ||||
| use ruma_state_res::{Error, Event, EventMap, Result, StateMap, StateResolution}; | ||||
| use ruma_state_res::{self as state_res, Error, Event, EventMap, Result, StateMap}; | ||||
| use serde_json::{json, Value as JsonValue}; | ||||
| 
 | ||||
| static SERVER_TIMESTAMP: AtomicU64 = AtomicU64::new(0); | ||||
| @ -45,7 +45,7 @@ fn lexico_topo_sort(c: &mut Criterion) { | ||||
|             event_id("p") => hashset![event_id("o")], | ||||
|         }; | ||||
|         b.iter(|| { | ||||
|             let _ = StateResolution::lexicographical_topological_sort(&graph, |id| { | ||||
|             let _ = state_res::lexicographical_topological_sort(&graph, |id| { | ||||
|                 Ok((0, MilliSecondsSinceUnixEpoch(uint!(0)), id.clone())) | ||||
|             }); | ||||
|         }) | ||||
| @ -62,7 +62,7 @@ fn resolution_shallow_auth_chain(c: &mut Criterion) { | ||||
|         b.iter(|| { | ||||
|             let ev_map: EventMap<Arc<StateEvent>> = store.0.clone(); | ||||
|             let state_sets = vec![state_at_bob.clone(), state_at_charlie.clone()]; | ||||
|             let _ = match StateResolution::resolve::<StateEvent, _>( | ||||
|             let _ = match state_res::resolve::<StateEvent, _>( | ||||
|                 &room_id(), | ||||
|                 &RoomVersionId::Version6, | ||||
|                 &state_sets, | ||||
| @ -119,7 +119,7 @@ fn resolve_deeper_event_set(c: &mut Criterion) { | ||||
| 
 | ||||
|         b.iter(|| { | ||||
|             let state_sets = vec![state_set_a.clone(), state_set_b.clone()]; | ||||
|             let _ = match StateResolution::resolve::<StateEvent, _>( | ||||
|             let _ = match state_res::resolve::<StateEvent, _>( | ||||
|                 &room_id(), | ||||
|                 &RoomVersionId::Version6, | ||||
|                 &state_sets, | ||||
|  | ||||
| @ -32,30 +32,24 @@ pub type StateMap<T> = HashMap<(EventType, String), T>; | ||||
| /// A mapping of `EventId` to `T`, usually a `ServerPdu`.
 | ||||
| pub type EventMap<T> = HashMap<EventId, T>; | ||||
| 
 | ||||
| #[derive(Default)] | ||||
| #[allow(clippy::exhaustive_structs)] | ||||
| pub struct StateResolution; | ||||
| 
 | ||||
| impl StateResolution { | ||||
|     /// Resolve sets of state events as they come in. Internally `StateResolution` builds a graph
 | ||||
|     /// and an auth chain to allow for state conflict resolution.
 | ||||
| /// Resolve sets of state events as they come in. Internally `StateResolution` builds a graph and an
 | ||||
| /// auth chain to allow for state conflict resolution.
 | ||||
| ///
 | ||||
| /// ## Arguments
 | ||||
| ///
 | ||||
|     /// * `state_sets` - The incoming state to resolve. Each `StateMap` represents a possible fork
 | ||||
|     /// in the state of a room.
 | ||||
| /// * `state_sets` - The incoming state to resolve. Each `StateMap` represents a possible fork in
 | ||||
| ///   the state of a room.
 | ||||
| ///
 | ||||
| /// * `auth_chain_sets` - The full recursive set of `auth_events` for each event in the
 | ||||
| ///   `state_sets`.
 | ||||
| ///
 | ||||
|     /// * `fetch_event` - Any event not found in the `event_map` will defer to this closure to find
 | ||||
|     /// the event.
 | ||||
| /// * `fetch_event` - Any event not found in the `event_map` will defer to this closure to find the
 | ||||
| ///   event.
 | ||||
| ///
 | ||||
| /// ## Invariants
 | ||||
| ///
 | ||||
|     /// The caller of `StateResolution::resolve` must ensure that all the events are from the same
 | ||||
|     /// room. Although this function takes a `RoomId` it does not check that each event is part
 | ||||
|     /// of the same room.
 | ||||
| /// The caller of `resolve` must ensure that all the events are from the same room. Although this
 | ||||
| /// function takes a `RoomId` it does not check that each event is part of the same room.
 | ||||
| pub fn resolve<E, F>( | ||||
|     room_id: &RoomId, | ||||
|     room_version: &RoomVersionId, | ||||
| @ -70,7 +64,7 @@ impl StateResolution { | ||||
|     info!("State resolution starting"); | ||||
| 
 | ||||
|     // Split non-conflicting and conflicting state
 | ||||
|         let (clean, conflicting) = StateResolution::separate(state_sets); | ||||
|     let (clean, conflicting) = separate(state_sets); | ||||
| 
 | ||||
|     info!("non conflicting events: {}", clean.len()); | ||||
|     trace!("{:?}", clean); | ||||
| @ -84,7 +78,7 @@ impl StateResolution { | ||||
|     debug!("{:?}", conflicting); | ||||
| 
 | ||||
|     // The set of auth events that are not common across server forks
 | ||||
|         let mut auth_diff = StateResolution::get_auth_chain_diff(room_id, auth_chain_sets)?; | ||||
|     let mut auth_diff = get_auth_chain_diff(room_id, auth_chain_sets)?; | ||||
| 
 | ||||
|     // Add the auth_diff to conflicting now we have a full set of conflicting events
 | ||||
|     auth_diff.extend(conflicting.values().cloned().flatten().flatten()); | ||||
| @ -114,23 +108,16 @@ impl StateResolution { | ||||
|         .collect::<Vec<_>>(); | ||||
| 
 | ||||
|     // Sort the control events based on power_level/clock/event_id and outgoing/incoming edges
 | ||||
|         let sorted_control_levels = StateResolution::reverse_topological_power_sort( | ||||
|             control_events, | ||||
|             &all_conflicted, | ||||
|             &fetch_event, | ||||
|         )?; | ||||
|     let sorted_control_levels = | ||||
|         reverse_topological_power_sort(control_events, &all_conflicted, &fetch_event)?; | ||||
| 
 | ||||
|     debug!("sorted control events: {}", sorted_control_levels.len()); | ||||
|     trace!("{:?}", sorted_control_levels); | ||||
| 
 | ||||
|     let room_version = RoomVersion::new(room_version)?; | ||||
|     // Sequentially auth check each control event.
 | ||||
|         let resolved_control = StateResolution::iterative_auth_check( | ||||
|             &room_version, | ||||
|             &sorted_control_levels, | ||||
|             &clean, | ||||
|             &fetch_event, | ||||
|         )?; | ||||
|     let resolved_control = | ||||
|         iterative_auth_check(&room_version, &sorted_control_levels, &clean, &fetch_event)?; | ||||
| 
 | ||||
|     debug!("resolved control events: {}", resolved_control.len()); | ||||
|     trace!("{:?}", resolved_control); | ||||
| @ -155,12 +142,11 @@ impl StateResolution { | ||||
| 
 | ||||
|     debug!("power event: {:?}", power_event); | ||||
| 
 | ||||
|         let sorted_left_events = | ||||
|             StateResolution::mainline_sort(&events_to_resolve, power_event, &fetch_event)?; | ||||
|     let sorted_left_events = mainline_sort(&events_to_resolve, power_event, &fetch_event)?; | ||||
| 
 | ||||
|     trace!("events left, sorted: {:?}", sorted_left_events.iter().collect::<Vec<_>>()); | ||||
| 
 | ||||
|         let mut resolved_state = StateResolution::iterative_auth_check( | ||||
|     let mut resolved_state = iterative_auth_check( | ||||
|         &room_version, | ||||
|         &sorted_left_events, | ||||
|         &resolved_control, // The control events are added to the final resolved state
 | ||||
| @ -174,11 +160,12 @@ impl StateResolution { | ||||
| } | ||||
| 
 | ||||
| /// Split the events that have no conflicts from those that are conflicting.
 | ||||
| ///
 | ||||
| /// The return tuple looks like `(unconflicted, conflicted)`.
 | ||||
| ///
 | ||||
|     /// State is determined to be conflicting if for the given key (EventType, StateKey) there
 | ||||
|     /// is not exactly one eventId. This includes missing events, if one state_set includes an event
 | ||||
|     /// that none of the other have this is a conflicting event.
 | ||||
| /// State is determined to be conflicting if for the given key (EventType, StateKey) there is not
 | ||||
| /// exactly one eventId. This includes missing events, if one state_set includes an event that none
 | ||||
| /// of the other have this is a conflicting event.
 | ||||
| pub fn separate( | ||||
|     state_sets: &[StateMap<EventId>], | ||||
| ) -> (StateMap<EventId>, StateMap<Vec<Option<EventId>>>) { | ||||
| @ -223,13 +210,13 @@ impl StateResolution { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|     /// Events are sorted from "earliest" to "latest". They are compared using
 | ||||
|     /// the negative power level (reverse topological ordering), the
 | ||||
|     /// origin server timestamp and incase of a tie the `EventId`s
 | ||||
|     /// are compared lexicographically.
 | ||||
| /// Events are sorted from "earliest" to "latest".
 | ||||
| ///
 | ||||
|     /// The power level is negative because a higher power level is equated to an
 | ||||
|     /// earlier (further back in time) origin server timestamp.
 | ||||
| /// They are compared using the negative power level (reverse topological ordering), the origin
 | ||||
| /// server timestamp and in case of a tie the `EventId`s are compared lexicographically.
 | ||||
| ///
 | ||||
| /// The power level is negative because a higher power level is equated to an earlier (further back
 | ||||
| /// in time) origin server timestamp.
 | ||||
| pub fn reverse_topological_power_sort<E, F>( | ||||
|     events_to_sort: Vec<EventId>, | ||||
|     auth_diff: &HashSet<EventId>, | ||||
| @ -243,12 +230,7 @@ impl StateResolution { | ||||
| 
 | ||||
|     let mut graph = HashMap::new(); | ||||
|     for event_id in events_to_sort { | ||||
|             StateResolution::add_event_and_auth_chain_to_graph( | ||||
|                 &mut graph, | ||||
|                 event_id, | ||||
|                 auth_diff, | ||||
|                 &fetch_event, | ||||
|             ); | ||||
|         add_event_and_auth_chain_to_graph(&mut graph, event_id, auth_diff, &fetch_event); | ||||
| 
 | ||||
|         // TODO: if these functions are ever made async here
 | ||||
|         // is a good place to yield every once in a while so other
 | ||||
| @ -258,7 +240,7 @@ impl StateResolution { | ||||
|     // This is used in the `key_fn` passed to the lexico_topo_sort fn
 | ||||
|     let mut event_to_pl = HashMap::new(); | ||||
|     for event_id in graph.keys() { | ||||
|             let pl = StateResolution::get_power_level_for_sender(event_id, &fetch_event); | ||||
|         let pl = get_power_level_for_sender(event_id, &fetch_event); | ||||
|         info!("{} power level {}", event_id, pl); | ||||
| 
 | ||||
|         event_to_pl.insert(event_id.clone(), pl); | ||||
| @ -268,7 +250,7 @@ impl StateResolution { | ||||
|         // tasks can make progress
 | ||||
|     } | ||||
| 
 | ||||
|         StateResolution::lexicographical_topological_sort(&graph, |event_id| { | ||||
|     lexicographical_topological_sort(&graph, |event_id| { | ||||
|         let ev = fetch_event(event_id).ok_or_else(|| Error::NotFound("".into()))?; | ||||
|         let pl = event_to_pl.get(event_id).ok_or_else(|| Error::NotFound("".into()))?; | ||||
| 
 | ||||
| @ -281,9 +263,10 @@ impl StateResolution { | ||||
|     }) | ||||
| } | ||||
| 
 | ||||
|     /// Sorts the event graph based on number of outgoing/incoming edges, where
 | ||||
|     /// `key_fn` is used as a tie breaker. The tie breaker happens based on
 | ||||
|     /// power level, age, and event_id.
 | ||||
| /// Sorts the event graph based on number of outgoing/incoming edges.
 | ||||
| ///
 | ||||
| /// `key_fn` is used as a tie breaker. The tie breaker happens based on power level, age, and
 | ||||
| /// event_id.
 | ||||
| pub fn lexicographical_topological_sort<F>( | ||||
|     graph: &HashMap<EventId, HashSet<EventId>>, | ||||
|     key_fn: F, | ||||
| @ -330,8 +313,7 @@ impl StateResolution { | ||||
|     // Destructure the `Reverse` and take the smallest `node` each time
 | ||||
|     while let Some(Reverse((_, node))) = heap.pop() { | ||||
|         let node: &EventId = node; | ||||
|             for parent in reverse_graph.get(node).expect("EventId in heap is also in reverse_graph") | ||||
|             { | ||||
|         for parent in reverse_graph.get(node).expect("EventId in heap is also in reverse_graph") { | ||||
|             // The number of outgoing edges this node has
 | ||||
|             let out = outdegree_map | ||||
|                 .get_mut(parent) | ||||
| @ -394,12 +376,11 @@ impl StateResolution { | ||||
| ///
 | ||||
| /// ## Returns
 | ||||
| ///
 | ||||
|     /// The `unconflicted_state` combined with the newly auth'ed events. So any event that
 | ||||
|     /// fails the `event_auth::auth_check` will be excluded from the returned `StateMap<EventId>`.
 | ||||
| /// The `unconflicted_state` combined with the newly auth'ed events. So any event that fails the
 | ||||
| /// `event_auth::auth_check` will be excluded from the returned `StateMap<EventId>`.
 | ||||
| ///
 | ||||
|     /// For each `events_to_check` event we gather the events needed to auth it from the
 | ||||
|     /// the `fetch_event` closure and verify each event using the `event_auth::auth_check`
 | ||||
|     /// function.
 | ||||
| /// For each `events_to_check` event we gather the events needed to auth it from the the
 | ||||
| /// `fetch_event` closure and verify each event using the `event_auth::auth_check` function.
 | ||||
| pub fn iterative_auth_check<E, F>( | ||||
|     room_version: &RoomVersion, | ||||
|     events_to_check: &[EventId], | ||||
| @ -491,13 +472,13 @@ impl StateResolution { | ||||
|     Ok(resolved_state) | ||||
| } | ||||
| 
 | ||||
|     /// Returns the sorted `to_sort` list of `EventId`s based on a mainline sort using
 | ||||
|     /// the depth of `resolved_power_level`, the server timestamp, and the eventId.
 | ||||
| /// Returns the sorted `to_sort` list of `EventId`s based on a mainline sort using the depth of
 | ||||
| /// `resolved_power_level`, the server timestamp, and the eventId.
 | ||||
| ///
 | ||||
| /// The depth of the given event is calculated based on the depth of it's closest "parent"
 | ||||
|     /// power_level event. If there have been two power events the after the most recent are
 | ||||
|     /// depth 0, the events before (with the first power level as a parent) will be marked
 | ||||
|     /// as depth 1. depth 1 is "older" than depth 0.
 | ||||
| /// power_level event. If there have been two power events the after the most recent are depth 0,
 | ||||
| /// the events before (with the first power level as a parent) will be marked as depth 1. depth 1 is
 | ||||
| /// "older" than depth 0.
 | ||||
| pub fn mainline_sort<E, F>( | ||||
|     to_sort: &[EventId], | ||||
|     resolved_power_level: Option<&EventId>, | ||||
| @ -546,9 +527,7 @@ impl StateResolution { | ||||
|     let mut order_map = HashMap::new(); | ||||
|     for ev_id in to_sort.iter() { | ||||
|         if let Some(event) = fetch_event(ev_id) { | ||||
|                 if let Ok(depth) = | ||||
|                     StateResolution::get_mainline_depth(Some(event), &mainline_map, &fetch_event) | ||||
|                 { | ||||
|             if let Ok(depth) = get_mainline_depth(Some(event), &mainline_map, &fetch_event) { | ||||
|                 order_map.insert( | ||||
|                     ev_id, | ||||
|                     ( | ||||
| @ -573,8 +552,8 @@ impl StateResolution { | ||||
|     Ok(sort_event_ids) | ||||
| } | ||||
| 
 | ||||
|     /// Get the mainline depth from the `mainline_map` or finds a power_level event
 | ||||
|     /// that has an associated mainline depth.
 | ||||
| /// Get the mainline depth from the `mainline_map` or finds a power_level event that has an
 | ||||
| /// associated mainline depth.
 | ||||
| fn get_mainline_depth<E, F>( | ||||
|     mut event: Option<Arc<E>>, | ||||
|     mainline_map: &EventMap<usize>, | ||||
| @ -631,7 +610,6 @@ impl StateResolution { | ||||
|         } | ||||
|     } | ||||
| } | ||||
| } | ||||
| 
 | ||||
| pub fn is_power_event_id<E, F>(event_id: &EventId, fetch: F) -> bool | ||||
| where | ||||
|  | ||||
| @ -5,7 +5,7 @@ use std::{ | ||||
| 
 | ||||
| use rand::seq::SliceRandom; | ||||
| use ruma_events::EventType; | ||||
| use ruma_state_res::{is_power_event, room_version::RoomVersion, StateMap, StateResolution}; | ||||
| use ruma_state_res::{self as state_res, is_power_event, room_version::RoomVersion, StateMap}; | ||||
| 
 | ||||
| mod utils; | ||||
| use utils::INITIAL_EVENTS; | ||||
| @ -27,12 +27,12 @@ fn test_event_sort() { | ||||
|         .collect::<Vec<_>>(); | ||||
| 
 | ||||
|     let sorted_power_events = | ||||
|         StateResolution::reverse_topological_power_sort(power_events, &auth_chain, |id| { | ||||
|         state_res::reverse_topological_power_sort(power_events, &auth_chain, |id| { | ||||
|             events.get(id).map(Arc::clone) | ||||
|         }) | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     let resolved_power = StateResolution::iterative_auth_check( | ||||
|     let resolved_power = state_res::iterative_auth_check( | ||||
|         &RoomVersion::version_6(), | ||||
|         &sorted_power_events, | ||||
|         &HashMap::new(), // unconflicted events
 | ||||
| @ -47,9 +47,8 @@ fn test_event_sort() { | ||||
| 
 | ||||
|     let power_level = resolved_power.get(&(EventType::RoomPowerLevels, "".to_owned())); | ||||
| 
 | ||||
|     let sorted_event_ids = StateResolution::mainline_sort(&events_to_sort, power_level, |id| { | ||||
|         events.get(id).map(Arc::clone) | ||||
|     }) | ||||
|     let sorted_event_ids = | ||||
|         state_res::mainline_sort(&events_to_sort, power_level, |id| events.get(id).map(Arc::clone)) | ||||
|             .unwrap(); | ||||
| 
 | ||||
|     assert_eq!( | ||||
|  | ||||
| @ -4,7 +4,7 @@ use std::{collections::HashMap, sync::Arc}; | ||||
| 
 | ||||
| use ruma_events::EventType; | ||||
| use ruma_identifiers::{EventId, RoomVersionId}; | ||||
| use ruma_state_res::{EventMap, StateMap, StateResolution}; | ||||
| use ruma_state_res::{self as state_res, EventMap, StateMap}; | ||||
| use serde_json::json; | ||||
| use tracing::debug; | ||||
| 
 | ||||
| @ -65,7 +65,7 @@ fn ban_with_auth_chains2() { | ||||
| 
 | ||||
|     let ev_map: EventMap<Arc<StateEvent>> = store.0.clone(); | ||||
|     let state_sets = vec![state_set_a, state_set_b]; | ||||
|     let resolved = match StateResolution::resolve::<StateEvent, _>( | ||||
|     let resolved = match state_res::resolve::<StateEvent, _>( | ||||
|         &room_id(), | ||||
|         &RoomVersionId::Version6, | ||||
|         &state_sets, | ||||
|  | ||||
| @ -5,7 +5,7 @@ use maplit::{hashmap, hashset}; | ||||
| use ruma_common::MilliSecondsSinceUnixEpoch; | ||||
| use ruma_events::{room::join_rules::JoinRule, EventType}; | ||||
| use ruma_identifiers::{EventId, RoomVersionId}; | ||||
| use ruma_state_res::{EventMap, StateMap, StateResolution}; | ||||
| use ruma_state_res::{self as state_res, EventMap, StateMap}; | ||||
| use serde_json::json; | ||||
| use tracing_subscriber as tracer; | ||||
| 
 | ||||
| @ -254,7 +254,7 @@ fn test_event_map_none() { | ||||
| 
 | ||||
|     let ev_map: EventMap<Arc<StateEvent>> = store.0.clone(); | ||||
|     let state_sets = vec![state_at_bob, state_at_charlie]; | ||||
|     let resolved = match StateResolution::resolve::<StateEvent, _>( | ||||
|     let resolved = match state_res::resolve::<StateEvent, _>( | ||||
|         &room_id(), | ||||
|         &RoomVersionId::Version2, | ||||
|         &state_sets, | ||||
| @ -285,7 +285,7 @@ fn test_lexicographical_sort() { | ||||
|         event_id("p") => hashset![event_id("o")], | ||||
|     }; | ||||
| 
 | ||||
|     let res = StateResolution::lexicographical_topological_sort(&graph, |id| { | ||||
|     let res = state_res::lexicographical_topological_sort(&graph, |id| { | ||||
|         Ok((0, MilliSecondsSinceUnixEpoch(uint!(0)), id.clone())) | ||||
|     }) | ||||
|     .unwrap(); | ||||
|  | ||||
| @ -21,7 +21,7 @@ use ruma_events::{ | ||||
|     EventType, | ||||
| }; | ||||
| use ruma_identifiers::{EventId, RoomId, RoomVersionId, UserId}; | ||||
| use ruma_state_res::{auth_types_for_event, Error, Event, Result, StateMap, StateResolution}; | ||||
| use ruma_state_res::{self as state_res, auth_types_for_event, Error, Event, Result, StateMap}; | ||||
| use serde_json::{json, Value as JsonValue}; | ||||
| use tracing::info; | ||||
| use tracing_subscriber as tracer; | ||||
| @ -79,7 +79,7 @@ pub fn do_check( | ||||
| 
 | ||||
|     // Resolve the current state and add it to the state_at_event map then continue
 | ||||
|     // on in "time"
 | ||||
|     for node in StateResolution::lexicographical_topological_sort(&graph, |id| { | ||||
|     for node in state_res::lexicographical_topological_sort(&graph, |id| { | ||||
|         Ok((0, MilliSecondsSinceUnixEpoch(uint!(0)), id.clone())) | ||||
|     }) | ||||
|     .unwrap() | ||||
| @ -111,7 +111,7 @@ pub fn do_check( | ||||
|                     .collect::<Vec<_>>() | ||||
|             ); | ||||
| 
 | ||||
|             let resolved = StateResolution::resolve( | ||||
|             let resolved = state_res::resolve( | ||||
|                 &room_id(), | ||||
|                 &RoomVersionId::Version6, | ||||
|                 &state_sets, | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user