state-res: Use closure to fetch unknown events during state-res

state-res: Remove event_map arg from all functions

state-res: Remove get_or_load_event helper func and fix resolve docs
This commit is contained in:
Devin Ragotzy 2021-06-29 16:13:08 -04:00 committed by Jonas Platte
parent 96567a295e
commit a4e6cc7c42
No known key found for this signature in database
GPG Key ID: CC154DE0E30B7C67
8 changed files with 147 additions and 118 deletions

View File

@ -5,6 +5,7 @@
Breaking changes:
* Replace `Vec` by `BTreeSet` in parts of the API
* Replace `event_map` argument with a closure to fetch events on demand
# 0.1.0

View File

@ -1,5 +1,5 @@
11/29/2020 BRANCH: timo-spec-comp REV: d2a85669cc6056679ce6ca0fde4658a879ad2b08
lexicographical topological sort
lexicographical topological sort
time: [1.7123 us 1.7157 us 1.7199 us]
change: [-1.7584% -1.5433% -1.3205%] (p = 0.00 < 0.05)
Performance has improved.
@ -8,30 +8,30 @@ Found 8 outliers among 100 measurements (8.00%)
5 (5.00%) high mild
1 (1.00%) high severe
resolve state of 5 events one fork
resolve state of 5 events one fork
time: [10.981 us 10.998 us 11.020 us]
Found 3 outliers among 100 measurements (3.00%)
3 (3.00%) high mild
resolve state of 10 events 3 conflicting
resolve state of 10 events 3 conflicting
time: [26.858 us 26.946 us 27.037 us]
11/29/2020 BRANCH: event-trait REV: f0eb1310efd49d722979f57f20bd1ac3592b0479
lexicographical topological sort
lexicographical topological sort
time: [1.7686 us 1.7738 us 1.7810 us]
change: [-3.2752% -2.4634% -1.7635%] (p = 0.00 < 0.05)
Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high severe
resolve state of 5 events one fork
resolve state of 5 events one fork
time: [10.643 us 10.656 us 10.669 us]
change: [-4.9990% -3.8078% -2.8319%] (p = 0.00 < 0.05)
Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high severe
resolve state of 10 events 3 conflicting
resolve state of 10 events 3 conflicting
time: [29.149 us 29.252 us 29.375 us]
change: [-0.8433% -0.3270% +0.2656%] (p = 0.25 > 0.05)
No change in performance detected.
@ -39,21 +39,40 @@ Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
4/26/2020 BRANCH: fix-test-serde REV:
lexicographical topological sort
lexicographical topological sort
time: [1.6793 us 1.6823 us 1.6857 us]
Found 9 outliers among 100 measurements (9.00%)
1 (1.00%) low mild
4 (4.00%) high mild
4 (4.00%) high severe
resolve state of 5 events one fork
resolve state of 5 events one fork
time: [9.9993 us 10.062 us 10.159 us]
Found 9 outliers among 100 measurements (9.00%)
7 (7.00%) high mild
2 (2.00%) high severe
resolve state of 10 events 3 conflicting
resolve state of 10 events 3 conflicting
time: [26.004 us 26.092 us 26.195 us]
Found 16 outliers among 100 measurements (16.00%)
11 (11.00%) high mild
5 (5.00%) high severe
5 (5.00%) high severe
6/30/2021 BRANCH: state-closure REV: 174c3e2a72232ad75b3fb14b3551f5f746f4fe84
lexicographical topological sort
time: [1.5496 us 1.5536 us 1.5586 us]
Found 9 outliers among 100 measurements (9.00%)
1 (1.00%) low mild
1 (1.00%) high mild
7 (7.00%) high severe
resolve state of 5 events one fork
time: [10.319 us 10.333 us 10.347 us]
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high severe
resolve state of 10 events 3 conflicting
time: [25.770 us 25.805 us 25.839 us]
Found 7 outliers among 100 measurements (7.00%)
5 (5.00%) high mild
2 (2.00%) high severe

View File

@ -60,9 +60,9 @@ fn resolution_shallow_auth_chain(c: &mut Criterion) {
let (state_at_bob, state_at_charlie, _) = store.set_up();
b.iter(|| {
let mut ev_map: EventMap<Arc<StateEvent>> = store.0.clone();
let ev_map: EventMap<Arc<StateEvent>> = store.0.clone();
let state_sets = vec![state_at_bob.clone(), state_at_charlie.clone()];
let _ = match StateResolution::resolve::<StateEvent>(
let _ = match StateResolution::resolve::<StateEvent, _>(
&room_id(),
&RoomVersionId::Version6,
&state_sets,
@ -74,7 +74,7 @@ fn resolution_shallow_auth_chain(c: &mut Criterion) {
.unwrap()
})
.collect(),
&mut ev_map,
&|id| ev_map.get(id).map(Arc::clone),
) {
Ok(state) => state,
Err(e) => panic!("{}", e),
@ -119,7 +119,7 @@ fn resolve_deeper_event_set(c: &mut Criterion) {
b.iter(|| {
let state_sets = vec![state_set_a.clone(), state_set_b.clone()];
let _ = match StateResolution::resolve::<StateEvent>(
let _ = match StateResolution::resolve::<StateEvent, _>(
&room_id(),
&RoomVersionId::Version6,
&state_sets,
@ -131,7 +131,7 @@ fn resolve_deeper_event_set(c: &mut Criterion) {
.unwrap()
})
.collect(),
&mut inner,
&|id| inner.get(id).map(Arc::clone),
) {
Ok(state) => state,
Err(_) => panic!("resolution failed during benchmarking"),

View File

@ -48,15 +48,25 @@ impl StateResolution {
/// * `auth_events` - The full recursive set of `auth_events` for each event in the
/// `state_sets`.
///
/// * `event_map` - The `EventMap` acts as a local cache of state, any event that is not found
/// in the `event_map` will cause an unrecoverable `Error` in `resolve`.
pub fn resolve<E: Event>(
/// * `fetch_event` - Any event not found in the `event_map` will defer to this closure to find
/// the event.
///
/// ## Invariants
///
/// The caller of `StateResolution::resolve` must ensure that all the events are from the same
/// room. Although this function takes a `RoomId` it does not check that each event is part
/// of the same room.
pub fn resolve<E, F>(
room_id: &RoomId,
room_version: &RoomVersionId,
state_sets: &[StateMap<EventId>],
auth_events: Vec<Vec<EventId>>,
event_map: &mut EventMap<Arc<E>>,
) -> Result<StateMap<EventId>> {
fetch_event: &F,
) -> Result<StateMap<EventId>>
where
E: Event,
F: Fn(&EventId) -> Option<Arc<E>>,
{
info!("State resolution starting");
// Split non-conflicting and conflicting state
@ -85,7 +95,7 @@ impl StateResolution {
// Don't honor events we cannot "verify"
// TODO: BTreeSet::retain() when stable 1.53
let all_conflicted =
auth_diff.into_iter().filter(|id| event_map.contains_key(id)).collect::<BTreeSet<_>>();
auth_diff.into_iter().filter(|id| fetch_event(id).is_some()).collect::<BTreeSet<_>>();
info!("full conflicted set is {} events", all_conflicted.len());
@ -95,16 +105,15 @@ impl StateResolution {
// Get only the control events with a state_key: "" or ban/kick event (sender != state_key)
let control_events = all_conflicted
.iter()
.filter(|id| is_power_event_id(id, event_map))
.filter(|id| is_power_event_id(id, fetch_event))
.cloned()
.collect::<Vec<_>>();
// Sort the control events based on power_level/clock/event_id and outgoing/incoming edges
let sorted_control_levels = StateResolution::reverse_topological_power_sort(
room_id,
&control_events,
event_map,
&all_conflicted,
fetch_event,
);
debug!("SRTD {:?}", sorted_control_levels);
@ -112,11 +121,10 @@ impl StateResolution {
let room_version = RoomVersion::new(room_version)?;
// Sequentially auth check each control event.
let resolved_control = StateResolution::iterative_auth_check(
room_id,
&room_version,
&sorted_control_levels,
&clean,
event_map,
fetch_event,
)?;
debug!("AUTHED {:?}", resolved_control.iter().collect::<Vec<_>>());
@ -141,16 +149,15 @@ impl StateResolution {
debug!("PL {:?}", power_event);
let sorted_left_events =
StateResolution::mainline_sort(room_id, &events_to_resolve, power_event, event_map);
StateResolution::mainline_sort(&events_to_resolve, power_event, fetch_event);
debug!("SORTED LEFT {:?}", sorted_left_events.iter().collect::<Vec<_>>());
let mut resolved_state = StateResolution::iterative_auth_check(
room_id,
&room_version,
&sorted_left_events,
&resolved_control, // The control events are added to the final resolved state
event_map,
fetch_event,
)?;
// Add unconflicted state to the resolved state
@ -224,18 +231,24 @@ impl StateResolution {
///
/// The power level is negative because a higher power level is equated to an
/// earlier (further back in time) origin server timestamp.
pub fn reverse_topological_power_sort<E: Event>(
room_id: &RoomId,
pub fn reverse_topological_power_sort<E, F>(
events_to_sort: &[EventId],
event_map: &mut EventMap<Arc<E>>,
auth_diff: &BTreeSet<EventId>,
) -> Vec<EventId> {
fetch_event: &F,
) -> Vec<EventId>
where
E: Event,
F: Fn(&EventId) -> Option<Arc<E>>,
{
debug!("reverse topological sort of power events");
let mut graph = BTreeMap::new();
for event_id in events_to_sort.iter() {
StateResolution::add_event_and_auth_chain_to_graph(
room_id, &mut graph, event_id, event_map, auth_diff,
&mut graph,
event_id,
auth_diff,
fetch_event,
);
// TODO: if these functions are ever made async here
@ -246,7 +259,7 @@ impl StateResolution {
// This is used in the `key_fn` passed to the lexico_topo_sort fn
let mut event_to_pl = BTreeMap::new();
for event_id in graph.keys() {
let pl = StateResolution::get_power_level_for_sender(room_id, event_id, event_map);
let pl = StateResolution::get_power_level_for_sender(event_id, fetch_event);
info!("{} power level {}", event_id, pl);
event_to_pl.insert(event_id.clone(), pl);
@ -257,7 +270,7 @@ impl StateResolution {
}
StateResolution::lexicographical_topological_sort(&graph, |event_id| {
let ev = event_map.get(event_id).unwrap();
let ev = fetch_event(event_id).unwrap();
let pl = event_to_pl.get(event_id).unwrap();
debug!("{:?}", (-*pl, ev.origin_server_ts(), &ev.event_id()));
@ -337,20 +350,20 @@ impl StateResolution {
}
/// Find the power level for the sender of `event_id` or return a default value of zero.
fn get_power_level_for_sender<E: Event>(
room_id: &RoomId,
event_id: &EventId,
event_map: &mut EventMap<Arc<E>>,
) -> i64 {
fn get_power_level_for_sender<E, F>(event_id: &EventId, fetch_event: &F) -> i64
where
E: Event,
F: Fn(&EventId) -> Option<Arc<E>>,
{
info!("fetch event ({}) senders power level", event_id);
let event = StateResolution::get_or_load_event(room_id, event_id, event_map);
let event = fetch_event(event_id);
let mut pl = None;
// TODO store.auth_event_ids returns "self" with the event ids is this ok
// event.auth_event_ids does not include its own event id ?
for aid in event.as_ref().map(|pdu| pdu.auth_events()).unwrap_or_default() {
if let Ok(aev) = StateResolution::get_or_load_event(room_id, &aid, event_map) {
if let Some(aev) = fetch_event(&aid) {
if is_type_and_key(&aev, EventType::RoomPowerLevels, "") {
pl = Some(aev);
break;
@ -365,7 +378,7 @@ impl StateResolution {
if let Some(content) =
pl.and_then(|pl| serde_json::from_value::<PowerLevelsEventContent>(pl.content()).ok())
{
if let Ok(ev) = event {
if let Some(ev) = event {
if let Some(user) = content.users.get(ev.sender()) {
debug!("found {} at power_level {}", ev.sender(), user);
return (*user).into();
@ -387,13 +400,16 @@ impl StateResolution {
/// For each `events_to_check` event we gather the events needed to auth it from the
/// `event_map` or `store` and verify each event using the `event_auth::auth_check`
/// function.
pub fn iterative_auth_check<E: Event>(
room_id: &RoomId,
pub fn iterative_auth_check<E, F>(
room_version: &RoomVersion,
events_to_check: &[EventId],
unconflicted_state: &StateMap<EventId>,
event_map: &mut EventMap<Arc<E>>,
) -> Result<StateMap<EventId>> {
fetch_event: &F,
) -> Result<StateMap<EventId>>
where
E: Event,
F: Fn(&EventId) -> Option<Arc<E>>,
{
info!("starting iterative auth check");
debug!("performing auth checks on {:?}", events_to_check.iter().collect::<Vec<_>>());
@ -401,14 +417,15 @@ impl StateResolution {
let mut resolved_state = unconflicted_state.clone();
for event_id in events_to_check.iter() {
let event = StateResolution::get_or_load_event(room_id, event_id, event_map)?;
let event = fetch_event(event_id)
.ok_or_else(|| Error::NotFound(format!("Failed to find {}", event_id)))?;
let state_key = event
.state_key()
.ok_or_else(|| Error::InvalidPdu("State event had no state key".to_owned()))?;
let mut auth_events = BTreeMap::new();
for aid in &event.auth_events() {
if let Ok(ev) = StateResolution::get_or_load_event(room_id, aid, event_map) {
if let Some(ev) = fetch_event(aid) {
// TODO synapse check "rejected_reason", I'm guessing this is redacted_because
// in ruma ??
auth_events.insert(
@ -432,8 +449,7 @@ impl StateResolution {
event.content(),
) {
if let Some(ev_id) = resolved_state.get(&key) {
if let Ok(event) = StateResolution::get_or_load_event(room_id, ev_id, event_map)
{
if let Some(event) = fetch_event(ev_id) {
// TODO synapse checks `rejected_reason` is None here
auth_events.insert(key.clone(), event);
}
@ -442,11 +458,8 @@ impl StateResolution {
debug!("event to check {:?}", event.event_id());
let most_recent_prev_event = event
.prev_events()
.iter()
.filter_map(|id| StateResolution::get_or_load_event(room_id, id, event_map).ok())
.next_back();
let most_recent_prev_event =
event.prev_events().iter().filter_map(|id| fetch_event(id)).next_back();
// The key for this is (eventType + a state_key of the signed token not sender) so
// search for it
@ -485,12 +498,15 @@ impl StateResolution {
/// power_level event. If there have been two power events the after the most recent are
/// depth 0, the events before (with the first power level as a parent) will be marked
/// as depth 1. depth 1 is "older" than depth 0.
pub fn mainline_sort<E: Event>(
room_id: &RoomId,
pub fn mainline_sort<E, F>(
to_sort: &[EventId],
resolved_power_level: Option<&EventId>,
event_map: &mut EventMap<Arc<E>>,
) -> Vec<EventId> {
fetch_event: &F,
) -> Vec<EventId>
where
E: Event,
F: Fn(&EventId) -> Option<Arc<E>>,
{
debug!("mainline sort of events");
// There are no EventId's to sort, bail.
@ -503,11 +519,11 @@ impl StateResolution {
while let Some(p) = pl {
mainline.push(p.clone());
let event = StateResolution::get_or_load_event(room_id, &p, event_map).unwrap();
let event = fetch_event(&p).unwrap();
let auth_events = &event.auth_events();
pl = None;
for aid in auth_events {
let ev = StateResolution::get_or_load_event(room_id, aid, event_map).unwrap();
let ev = fetch_event(aid).unwrap();
if is_type_and_key(&ev, EventType::RoomPowerLevels, "") {
pl = Some(aid.clone());
break;
@ -527,18 +543,15 @@ impl StateResolution {
let mut order_map = BTreeMap::new();
for ev_id in to_sort.iter() {
if let Ok(event) = StateResolution::get_or_load_event(room_id, ev_id, event_map) {
if let Ok(depth) = StateResolution::get_mainline_depth(
room_id,
Some(event),
&mainline_map,
event_map,
) {
if let Some(event) = fetch_event(ev_id) {
if let Ok(depth) =
StateResolution::get_mainline_depth(Some(event), &mainline_map, fetch_event)
{
order_map.insert(
ev_id,
(
depth,
event_map.get(ev_id).map(|ev| ev.origin_server_ts()),
fetch_event(ev_id).map(|ev| ev.origin_server_ts()),
ev_id, // TODO should this be a &str to sort lexically??
),
);
@ -560,12 +573,15 @@ impl StateResolution {
/// Get the mainline depth from the `mainline_map` or finds a power_level event
/// that has an associated mainline depth.
fn get_mainline_depth<E: Event>(
room_id: &RoomId,
fn get_mainline_depth<E, F>(
mut event: Option<Arc<E>>,
mainline_map: &EventMap<usize>,
event_map: &mut EventMap<Arc<E>>,
) -> Result<usize> {
fetch_event: &F,
) -> Result<usize>
where
E: Event,
F: Fn(&EventId) -> Option<Arc<E>>,
{
while let Some(sort_ev) = event {
debug!("mainline event_id {}", sort_ev.event_id());
let id = &sort_ev.event_id();
@ -576,7 +592,8 @@ impl StateResolution {
let auth_events = &sort_ev.auth_events();
event = None;
for aid in auth_events {
let aev = StateResolution::get_or_load_event(room_id, aid, event_map)?;
let aev = fetch_event(aid)
.ok_or_else(|| Error::NotFound(format!("Failed to find {}", aid)))?;
if is_type_and_key(&aev, EventType::RoomPowerLevels, "") {
event = Some(aev);
break;
@ -587,23 +604,22 @@ impl StateResolution {
Ok(0)
}
fn add_event_and_auth_chain_to_graph<E: Event>(
room_id: &RoomId,
fn add_event_and_auth_chain_to_graph<E, F>(
graph: &mut BTreeMap<EventId, BTreeSet<EventId>>,
event_id: &EventId,
event_map: &mut EventMap<Arc<E>>,
auth_diff: &BTreeSet<EventId>,
) {
fetch_event: &F,
) where
E: Event,
F: Fn(&EventId) -> Option<Arc<E>>,
{
let mut state = vec![event_id.clone()];
while !state.is_empty() {
// We just checked if it was empty so unwrap is fine
let eid = state.pop().unwrap();
graph.entry(eid.clone()).or_insert(btreeset![]);
// Prefer the store to event as the store filters dedups the events
for aid in &StateResolution::get_or_load_event(room_id, &eid, event_map)
.map(|ev| ev.auth_events())
.unwrap_or_default()
{
for aid in &fetch_event(&eid).map(|ev| ev.auth_events()).unwrap_or_default() {
if auth_diff.contains(aid) {
if !graph.contains_key(aid) {
state.push(aid.clone());
@ -615,22 +631,14 @@ impl StateResolution {
}
}
}
/// Uses the `event_map` to return the full PDU or fails.
fn get_or_load_event<E: Event>(
_room_id: &RoomId,
ev_id: &EventId,
event_map: &EventMap<Arc<E>>,
) -> Result<Arc<E>> {
event_map.get(ev_id).map_or_else(
|| Err(Error::NotFound(format!("EventId: {:?} not found", ev_id))),
|e| Ok(Arc::clone(e)),
)
}
}
pub fn is_power_event_id<E: Event>(event_id: &EventId, event_map: &EventMap<Arc<E>>) -> bool {
match event_map.get(event_id) {
pub fn is_power_event_id<E, F>(event_id: &EventId, fetch: &F) -> bool
where
E: Event,
F: Fn(&EventId) -> Option<Arc<E>>,
{
match fetch(event_id).as_ref() {
Some(state) => is_power_event(state),
_ => false,
}

View File

@ -1,14 +1,17 @@
use std::collections::{BTreeMap, BTreeSet};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use rand::seq::SliceRandom;
use ruma_events::EventType;
use ruma_state_res::{is_power_event, room_version::RoomVersion, StateMap, StateResolution};
mod utils;
use utils::{room_id, INITIAL_EVENTS};
use utils::INITIAL_EVENTS;
fn test_event_sort() {
let mut events = INITIAL_EVENTS();
let events = INITIAL_EVENTS();
let event_map = events
.values()
@ -26,21 +29,18 @@ fn test_event_sort() {
// This is a TODO in conduit
// TODO these events are not guaranteed to be sorted but they are resolved, do
// we need the auth_chain
let sorted_power_events = StateResolution::reverse_topological_power_sort(
&room_id(),
&power_events,
&mut events,
&auth_chain,
);
let sorted_power_events =
StateResolution::reverse_topological_power_sort(&power_events, &auth_chain, &|id| {
events.get(id).map(Arc::clone)
});
// This is a TODO in conduit
// TODO we may be able to skip this since they are resolved according to spec
let resolved_power = StateResolution::iterative_auth_check(
&room_id(),
&RoomVersion::version_6(),
&sorted_power_events,
&BTreeMap::new(), // unconflicted events
&mut events,
&|id| events.get(id).map(Arc::clone),
)
.expect("iterative auth check failed on resolved events");
@ -51,8 +51,9 @@ fn test_event_sort() {
let power_level = resolved_power.get(&(EventType::RoomPowerLevels, "".to_owned()));
let sorted_event_ids =
StateResolution::mainline_sort(&room_id(), &events_to_sort, power_level, &mut events);
let sorted_event_ids = StateResolution::mainline_sort(&events_to_sort, power_level, &|id| {
events.get(id).map(Arc::clone)
});
assert_eq!(
vec![

View File

@ -63,9 +63,9 @@ fn ban_with_auth_chains2() {
.map(|ev| ((ev.kind(), ev.state_key()), ev.event_id().clone()))
.collect::<StateMap<_>>();
let mut ev_map: EventMap<Arc<StateEvent>> = store.0.clone();
let ev_map: EventMap<Arc<StateEvent>> = store.0.clone();
let state_sets = vec![state_set_a, state_set_b];
let resolved = match StateResolution::resolve::<StateEvent>(
let resolved = match StateResolution::resolve::<StateEvent, _>(
&room_id(),
&RoomVersionId::Version6,
&state_sets,
@ -77,7 +77,7 @@ fn ban_with_auth_chains2() {
.unwrap()
})
.collect(),
&mut ev_map,
&|id| ev_map.get(id).map(Arc::clone),
) {
Ok(state) => state,
Err(e) => panic!("{}", e),

View File

@ -252,9 +252,9 @@ fn test_event_map_none() {
// build up the DAG
let (state_at_bob, state_at_charlie, expected) = store.set_up();
let mut ev_map: EventMap<Arc<StateEvent>> = store.0.clone();
let ev_map: EventMap<Arc<StateEvent>> = store.0.clone();
let state_sets = vec![state_at_bob, state_at_charlie];
let resolved = match StateResolution::resolve::<StateEvent>(
let resolved = match StateResolution::resolve::<StateEvent, _>(
&room_id(),
&RoomVersionId::Version2,
&state_sets,
@ -266,7 +266,7 @@ fn test_event_map_none() {
.unwrap()
})
.collect(),
&mut ev_map,
&|id| ev_map.get(id).map(Arc::clone),
) {
Ok(state) => state,
Err(e) => panic!("{}", e),

View File

@ -121,7 +121,7 @@ pub fn do_check(
.unwrap()
})
.collect(),
&mut event_map,
&|id| event_map.get(id).map(Arc::clone),
);
match resolved {
Ok(state) => state,