Start work on db less state-res
This commit is contained in:
parent
611d1a3d9c
commit
282270ed4f
@ -21,9 +21,9 @@ thiserror = "1.0.22"
|
||||
|
||||
[dependencies.ruma]
|
||||
git = "https://github.com/ruma/ruma"
|
||||
# branch = "dev/unstable-join"
|
||||
# branch = "verified-export"
|
||||
# path = "../__forks__/ruma/ruma"
|
||||
rev = "ee814aa84934530d76f5e4b275d739805b49bdef"
|
||||
rev = "45d01011554f9d07739e9a5edf5498d8ac16f273"
|
||||
features = ["client-api", "federation-api", "appservice-api", "unstable-pre-spec", "unstable-synapse-quirks"]
|
||||
|
||||
#[dependencies.ruma]
|
||||
|
139
src/lib.rs
139
src/lib.rs
@ -83,141 +83,6 @@ impl StateResolution {
|
||||
event_auth::auth_check(room_version, &ev, prev_event, auth_events, None)
|
||||
}
|
||||
|
||||
/// Resolve the conflicted events. This function is similar to
|
||||
/// `StateResolution::resolve` by sorting and iteratively auth checking
|
||||
/// events, but it will not separate state sets into "clean" and "conflicting".
|
||||
///
|
||||
/// The `conflicted` events can be an incoming pdu and the events needed to fill
|
||||
/// the gap between the incoming event and the last known parent on the receiving server.
|
||||
pub fn resolve_incoming(
|
||||
room_id: &RoomId,
|
||||
room_version: &RoomVersionId,
|
||||
unconflicted: &StateMap<EventId>,
|
||||
conflicted: Vec<((EventType, String), EventId)>,
|
||||
event_map: Option<EventMap<Arc<StateEvent>>>,
|
||||
store: &dyn StateStore,
|
||||
) -> Result<StateMap<EventId>> {
|
||||
let mut event_map = if let Some(ev_map) = event_map {
|
||||
ev_map
|
||||
} else {
|
||||
BTreeMap::new()
|
||||
};
|
||||
|
||||
let all_ids = vec![
|
||||
unconflicted.values().cloned().collect(),
|
||||
// conflicted can contain duplicates no BTreeMaps
|
||||
conflicted.iter().map(|(_, v)| v).cloned().collect(),
|
||||
];
|
||||
// We use the store here since it takes a Vec<Vec> instead of Vec<Maps> like
|
||||
// `StateResolution::get_auth_chain_diff`
|
||||
let mut auth_diff = store.auth_chain_diff(room_id, all_ids)?;
|
||||
|
||||
tracing::debug!("auth diff size {}", auth_diff.len());
|
||||
|
||||
// add the auth_diff to conflicting now we have a full set of conflicting events
|
||||
auth_diff.extend(conflicted.iter().map(|(_, v)| v).cloned());
|
||||
let mut all_conflicted = auth_diff
|
||||
.into_iter()
|
||||
.collect::<BTreeSet<_>>()
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::info!("full conflicted set is {} events", all_conflicted.len());
|
||||
|
||||
// gather missing events for the event_map
|
||||
let events = store
|
||||
.get_events(
|
||||
room_id,
|
||||
&all_conflicted
|
||||
.iter()
|
||||
// we only want the events we don't know about yet
|
||||
.filter(|id| !event_map.contains_key(id))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// update event_map to include the fetched events
|
||||
event_map.extend(events.into_iter().map(|ev| (ev.event_id(), ev)));
|
||||
|
||||
// Don't honor events we cannot verify.
|
||||
all_conflicted.retain(|id| event_map.contains_key(id));
|
||||
|
||||
// get only the control events with a state_key: "" or ban/kick event (sender != state_key)
|
||||
let control_events = all_conflicted
|
||||
.iter()
|
||||
.filter(|id| is_power_event(id, &event_map))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// sort the control events based on power_level/clock/event_id and outgoing/incoming edges
|
||||
let mut sorted_control_levels = StateResolution::reverse_topological_power_sort(
|
||||
room_id,
|
||||
&control_events,
|
||||
&mut event_map,
|
||||
store,
|
||||
&all_conflicted,
|
||||
);
|
||||
|
||||
// sequentially auth check each control event.
|
||||
let resolved_control = StateResolution::iterative_auth_check(
|
||||
room_id,
|
||||
room_version,
|
||||
&sorted_control_levels,
|
||||
unconflicted,
|
||||
&mut event_map,
|
||||
store,
|
||||
)?;
|
||||
|
||||
// At this point the control_events have been resolved we now have to
|
||||
// sort the remaining events using the mainline of the resolved power level.
|
||||
sorted_control_levels.dedup();
|
||||
let deduped_power_ev = sorted_control_levels;
|
||||
|
||||
// This removes the control events that passed auth and more importantly those that failed auth
|
||||
let events_to_resolve = all_conflicted
|
||||
.iter()
|
||||
.filter(|id| !deduped_power_ev.contains(id))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// This "epochs" power level event
|
||||
let power_event = resolved_control.get(&(EventType::RoomPowerLevels, "".into()));
|
||||
|
||||
tracing::debug!("PL {:?}", power_event);
|
||||
|
||||
let sorted_left_events = StateResolution::mainline_sort(
|
||||
room_id,
|
||||
&events_to_resolve,
|
||||
power_event,
|
||||
&mut event_map,
|
||||
store,
|
||||
);
|
||||
|
||||
tracing::debug!(
|
||||
"SORTED LEFT {:?}",
|
||||
sorted_left_events
|
||||
.iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
let resolved_state = StateResolution::iterative_auth_check(
|
||||
room_id,
|
||||
room_version,
|
||||
&sorted_left_events,
|
||||
&resolved_control, // The control events are added to the final resolved state
|
||||
&mut event_map,
|
||||
store,
|
||||
)?;
|
||||
|
||||
// Add unconflicted state to the resolved state
|
||||
// We do it this way so any resolved_state would overwrite unconflicted
|
||||
let mut clean = unconflicted.clone();
|
||||
clean.extend(resolved_state);
|
||||
Ok(clean)
|
||||
}
|
||||
|
||||
/// Resolve sets of state events as they come in. Internally `StateResolution` builds a graph
|
||||
/// and an auth chain to allow for state conflict resolution.
|
||||
///
|
||||
@ -240,6 +105,7 @@ impl StateResolution {
|
||||
room_id: &RoomId,
|
||||
room_version: &RoomVersionId,
|
||||
state_sets: &[StateMap<EventId>],
|
||||
// TODO: make the `Option<&mut EventMap<Arc<StateEvent>>>`
|
||||
event_map: Option<EventMap<Arc<StateEvent>>>,
|
||||
store: &dyn StateStore,
|
||||
) -> Result<StateMap<EventId>> {
|
||||
@ -397,8 +263,7 @@ impl StateResolution {
|
||||
)?;
|
||||
|
||||
// add unconflicted state to the resolved state
|
||||
// TODO:
|
||||
// CLEAN OVERWRITES ANY DUPLICATE KEYS FROM RESOLVED STATE
|
||||
// We priorities the unconflicting state
|
||||
resolved_state.extend(clean);
|
||||
Ok(resolved_state)
|
||||
}
|
||||
|
@ -1,188 +0,0 @@
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use ruma::{
|
||||
events::EventType,
|
||||
identifiers::{EventId, RoomVersionId},
|
||||
};
|
||||
use serde_json::json;
|
||||
use state_res::{StateEvent, StateResolution};
|
||||
use tracing_subscriber as tracer;
|
||||
|
||||
mod utils;
|
||||
use utils::{
|
||||
alice, bob, ella, member_content_ban, member_content_join, room_id, to_pdu_event, zara,
|
||||
TestStore, INITIAL_EVENTS, LOGGER,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn resolve_incoming_jr() {
|
||||
let _ = LOGGER.call_once(|| {
|
||||
tracer::fmt()
|
||||
.with_env_filter(tracer::EnvFilter::from_default_env())
|
||||
.init()
|
||||
});
|
||||
|
||||
let events = INITIAL_EVENTS();
|
||||
let conflicted = JOIN_RULE();
|
||||
let store = TestStore(
|
||||
events
|
||||
.clone()
|
||||
.into_iter()
|
||||
.chain(conflicted.clone())
|
||||
.collect(),
|
||||
);
|
||||
|
||||
let res = match StateResolution::resolve_incoming(
|
||||
&room_id(),
|
||||
&RoomVersionId::Version6,
|
||||
&events
|
||||
.iter()
|
||||
.map(|(_, ev)| ((ev.kind(), ev.state_key()), ev.event_id()))
|
||||
.collect(),
|
||||
conflicted
|
||||
.iter()
|
||||
.map(|(_, ev)| ((ev.kind(), ev.state_key()), ev.event_id()))
|
||||
.collect(),
|
||||
None,
|
||||
&store,
|
||||
) {
|
||||
Ok(state) => state,
|
||||
Err(e) => panic!("{:?}", e),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
vec![
|
||||
"$CREATE:foo",
|
||||
"$JR:foo",
|
||||
"$IMA:foo",
|
||||
"$IMB:foo",
|
||||
"$IMC:foo",
|
||||
"$IPOWER:foo",
|
||||
"$START:foo"
|
||||
],
|
||||
res.values().map(|id| id.to_string()).collect::<Vec<_>>()
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_incoming_ban() {
|
||||
let _ = LOGGER.call_once(|| {
|
||||
tracer::fmt()
|
||||
.with_env_filter(tracer::EnvFilter::from_default_env())
|
||||
.init()
|
||||
});
|
||||
|
||||
let events = INITIAL_EVENTS();
|
||||
let conflicted = BAN_STATE_SET();
|
||||
let store = TestStore(
|
||||
events
|
||||
.clone()
|
||||
.into_iter()
|
||||
.chain(conflicted.clone())
|
||||
.collect(),
|
||||
);
|
||||
|
||||
let res = match StateResolution::resolve_incoming(
|
||||
&room_id(),
|
||||
&RoomVersionId::Version6,
|
||||
&events
|
||||
.iter()
|
||||
.map(|(_, ev)| ((ev.kind(), ev.state_key()), ev.event_id()))
|
||||
.collect(),
|
||||
conflicted
|
||||
.iter()
|
||||
.map(|(_, ev)| ((ev.kind(), ev.state_key()), ev.event_id()))
|
||||
.collect(),
|
||||
None,
|
||||
&store,
|
||||
) {
|
||||
Ok(state) => state,
|
||||
Err(e) => panic!("{:?}", e),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
vec![
|
||||
"$CREATE:foo",
|
||||
"$IJR:foo",
|
||||
"$IMA:foo",
|
||||
"$IMB:foo",
|
||||
"$IMC:foo",
|
||||
"$MB:foo",
|
||||
"$PB:foo",
|
||||
"$START:foo"
|
||||
],
|
||||
res.values().map(|id| id.to_string()).collect::<Vec<_>>()
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
fn JOIN_RULE() -> BTreeMap<EventId, Arc<StateEvent>> {
|
||||
vec![
|
||||
to_pdu_event(
|
||||
"JR",
|
||||
alice(),
|
||||
EventType::RoomJoinRules,
|
||||
Some(""),
|
||||
json!({"join_rule": "invite"}),
|
||||
&["CREATE", "IMA", "IPOWER"],
|
||||
&["START"],
|
||||
),
|
||||
to_pdu_event(
|
||||
"IMZ",
|
||||
zara(),
|
||||
EventType::RoomPowerLevels,
|
||||
Some(zara().as_str()),
|
||||
member_content_join(),
|
||||
&["CREATE", "JR", "IPOWER"],
|
||||
&["START"],
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|ev| (ev.event_id(), ev))
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
fn BAN_STATE_SET() -> BTreeMap<EventId, Arc<StateEvent>> {
|
||||
vec![
|
||||
to_pdu_event(
|
||||
"PA",
|
||||
alice(),
|
||||
EventType::RoomPowerLevels,
|
||||
Some(""),
|
||||
json!({"users": {alice(): 100, bob(): 50}}),
|
||||
&["CREATE", "IMA", "IPOWER"], // auth_events
|
||||
&["START"], // prev_events
|
||||
),
|
||||
to_pdu_event(
|
||||
"PB",
|
||||
alice(),
|
||||
EventType::RoomPowerLevels,
|
||||
Some(""),
|
||||
json!({"users": {alice(): 100, bob(): 50}}),
|
||||
&["CREATE", "IMA", "IPOWER"],
|
||||
&["END"],
|
||||
),
|
||||
to_pdu_event(
|
||||
"MB",
|
||||
alice(),
|
||||
EventType::RoomMember,
|
||||
Some(ella().as_str()),
|
||||
member_content_ban(),
|
||||
&["CREATE", "IMA", "PB"],
|
||||
&["PA"],
|
||||
),
|
||||
to_pdu_event(
|
||||
"IME",
|
||||
ella(),
|
||||
EventType::RoomMember,
|
||||
Some(ella().as_str()),
|
||||
member_content_join(),
|
||||
&["CREATE", "IJR", "PA"],
|
||||
&["MB"],
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|ev| (ev.event_id(), ev))
|
||||
.collect()
|
||||
}
|
@ -71,6 +71,8 @@ pub fn do_check(
|
||||
}
|
||||
}
|
||||
|
||||
panic!("{}", serde_json::to_string_pretty(&graph).unwrap());
|
||||
|
||||
// event_id -> StateEvent
|
||||
let mut event_map: BTreeMap<EventId, Arc<StateEvent>> = BTreeMap::new();
|
||||
// event_id -> StateMap<EventId>
|
||||
@ -125,7 +127,6 @@ pub fn do_check(
|
||||
|
||||
// if fake_event.state_key().is_some() {
|
||||
let ty = fake_event.kind().clone();
|
||||
// we know there is a state_key unwrap OK
|
||||
let key = fake_event.state_key().clone();
|
||||
state_after.insert((ty, key), event_id.clone());
|
||||
// }
|
||||
|
Loading…
x
Reference in New Issue
Block a user