Use ruma's PDU events not the sent events

This commit is contained in:
Devin R 2020-07-18 16:14:57 -04:00
parent c043b10d64
commit d3c3c95985
6 changed files with 179 additions and 89 deletions

View File

@ -9,8 +9,11 @@ edition = "2018"
petgraph = "0.5.1" petgraph = "0.5.1"
serde = { version = "1.0.114", features = ["derive"] } serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.56" serde_json = "1.0.56"
maplit = "1.0.2" tracing = "0.1.16"
[dependencies.ruma] [dependencies.ruma]
git = "https://github.com/ruma/ruma" git = "https://github.com/ruma/ruma"
features = ["client-api", "federation-api", "appservice-api"] features = ["client-api", "federation-api", "appservice-api"]
[dev-dependencies]
maplit = "1.0.2"

38
README.md Normal file
View File

@ -0,0 +1,38 @@
Would it be possible to abstract state res into a `ruma-state-res` crate? I've been thinking about something along the lines of
```rust
// The would need to be Serialize/Deserialize to save state
struct StateResV2 {
// Should any information be kept or should all of it be fetched from the
// StateStore trait?,
state_graph: Something,
// fields for temp storage during resolution??
conflicting_events: StateMap<Vec<EventId>>,
}
impl StateResV2 {
/// The point of this all add nonconflicting events to the graph
/// and resolve and add conflicting events.
fn resolve(&mut self, events: Vec<StateMap<EventId>>) -> StateMap<EventId> { }
}
// The tricky part to making this a good abstraction
trait StateStore {
fn get_event(&self, event_id: &EventId) -> Pdu/AnyStateEvent;
fn get_events(&self, event_ids: &[EventId]) -> Pdu/AnyStateEvent;
fn auth_event_ids(&self, room_id: &RoomId, event_id: &EventId) -> Vec<EventId>;
fn get_remote_state_for_room(
&self,
room_id: &RoomId,
version: &RoomVersionId,
event_id: &EventId,
) -> (Vec<StateEvent>, Vec<StateEvent>);
}
```
Now to be totally fair I have no real understanding of state reso

View File

@ -57,28 +57,55 @@ impl StateResolution {
&mut self, &mut self,
room_id: &RoomId, room_id: &RoomId,
room_version: &RoomVersionId, room_version: &RoomVersionId,
state_sets: Vec<StateMap<EventId>>, state_sets: &[StateMap<EventId>],
store: &dyn StateStore, store: &dyn StateStore,
// TODO actual error handling (`thiserror`??) // TODO actual error handling (`thiserror`??)
) -> Result<ResolutionResult, serde_json::Error> { ) -> Result<ResolutionResult, String> {
tracing::debug!("State resolution starting");
let mut event_map = EventMap::new(); let mut event_map = EventMap::new();
// split non-conflicting and conflicting state // split non-conflicting and conflicting state
let (clean, conflicting) = self.seperate(&state_sets); let (clean, conflicting) = self.seperate(&state_sets);
if conflicting.is_empty() { if conflicting.is_empty() {
return Ok(ResolutionResult::Resolved( return Ok(ResolutionResult::Resolved(clean));
clean.into_iter().flat_map(|map| map.into_iter()).collect(),
));
} }
tracing::debug!("computing {} conflicting events", conflicting.len());
// the set of auth events that are not common across server forks // the set of auth events that are not common across server forks
let mut auth_diff = self.get_auth_chain_diff(&state_sets, &mut event_map, store)?; let mut auth_diff = self.get_auth_chain_diff(&state_sets, &mut event_map, store)?;
// add the auth_diff to conflicting now we have a full set of conflicting events // add the auth_diff to conflicting now we have a full set of conflicting events
auth_diff.extend(conflicting.iter().flat_map(|map| map.values().cloned())); auth_diff.extend(conflicting.values().cloned().flatten());
let all_conflicted = auth_diff; let all_conflicted = auth_diff;
// TODO get events and add to event_map tracing::debug!("full conflicted set is {} events", all_conflicted.len());
let events = store
.get_events(
&all_conflicted
.iter()
// we only want the events we don't know about yet
.filter(|id| !event_map.contains_key(id))
.cloned()
.collect::<Vec<_>>(),
)
.unwrap();
for event in event_map.values() {
if event.room_id() != Some(room_id) {
return Err(format!(
"resolving event {} in room {}, when correct room is {}",
event
.event_id()
.map(|id| id.as_str())
.unwrap_or("`unknown`"),
event.room_id().map(|id| id.as_str()).unwrap_or("`unknown`"),
room_id.as_str()
));
}
}
// TODO throw error if event is not for this room // TODO throw error if event is not for this room
// TODO make sure each conflicting event is in?? event_map `{eid for eid in full_conflicted_set if eid in event_map}` // TODO make sure each conflicting event is in?? event_map `{eid for eid in full_conflicted_set if eid in event_map}`
@ -109,7 +136,6 @@ impl StateResolution {
// At this point the power_events have been resolved we now have to // At this point the power_events have been resolved we now have to
// sort the remaining events using the mainline of the resolved power level. // sort the remaining events using the mainline of the resolved power level.
sorted_power_levels.dedup(); sorted_power_levels.dedup();
let deduped_power_ev = sorted_power_levels; let deduped_power_ev = sorted_power_levels;
@ -128,13 +154,13 @@ impl StateResolution {
room_id, room_id,
room_version, room_version,
&sorted_left_events, &sorted_left_events,
&[resolved], &resolved,
&mut event_map, &mut event_map,
store, store,
); );
// add unconflicted state to the resolved state // add unconflicted state to the resolved state
resolved_state.extend(clean.into_iter().flat_map(|map| map.into_iter())); resolved_state.extend(clean);
// TODO return something not a place holder // TODO return something not a place holder
Ok(ResolutionResult::Resolved(resolved_state)) Ok(ResolutionResult::Resolved(resolved_state))
@ -146,8 +172,25 @@ impl StateResolution {
fn seperate( fn seperate(
&mut self, &mut self,
state_sets: &[StateMap<EventId>], state_sets: &[StateMap<EventId>],
) -> (Vec<StateMap<EventId>>, Vec<StateMap<EventId>>) { ) -> (StateMap<EventId>, StateMap<Vec<EventId>>) {
panic!() let mut unconflicted_state = StateMap::new();
let mut conflicted_state = StateMap::new();
for key in state_sets.iter().flat_map(|map| map.keys()) {
let mut event_ids = state_sets
.iter()
.flat_map(|map| map.get(key).cloned())
.collect::<Vec<EventId>>();
if event_ids.len() == 1 {
// unwrap is ok since we know the len is 1
unconflicted_state.insert(key.clone(), event_ids.pop().unwrap());
} else {
conflicted_state.insert(key.clone(), event_ids);
}
}
(unconflicted_state, conflicted_state)
} }
/// Returns a Vec of deduped EventIds that appear in some chains but no others. /// Returns a Vec of deduped EventIds that appear in some chains but no others.
@ -156,7 +199,8 @@ impl StateResolution {
state_sets: &[StateMap<EventId>], state_sets: &[StateMap<EventId>],
event_map: &EventMap<StateEvent>, event_map: &EventMap<StateEvent>,
store: &dyn StateStore, store: &dyn StateStore,
) -> Result<Vec<EventId>, serde_json::Error> { ) -> Result<Vec<EventId>, String> {
tracing::debug!("calculating auth chain difference");
panic!() panic!()
} }
@ -168,6 +212,7 @@ impl StateResolution {
store: &dyn StateStore, store: &dyn StateStore,
conflicted_set: &[EventId], conflicted_set: &[EventId],
) -> Vec<EventId> { ) -> Vec<EventId> {
tracing::debug!("reverse topological sort of power events");
panic!() panic!()
} }
@ -176,10 +221,11 @@ impl StateResolution {
room_id: &RoomId, room_id: &RoomId,
room_version: &RoomVersionId, room_version: &RoomVersionId,
power_events: &[EventId], power_events: &[EventId],
unconflicted_state: &[StateMap<EventId>], unconflicted_state: &StateMap<EventId>,
event_map: &EventMap<StateEvent>, event_map: &EventMap<StateEvent>,
store: &dyn StateStore, store: &dyn StateStore,
) -> StateMap<EventId> { ) -> StateMap<EventId> {
tracing::debug!("starting iter auth check");
panic!() panic!()
} }
@ -193,6 +239,7 @@ impl StateResolution {
event_map: &EventMap<StateEvent>, event_map: &EventMap<StateEvent>,
store: &dyn StateStore, store: &dyn StateStore,
) -> Vec<EventId> { ) -> Vec<EventId> {
tracing::debug!("mainline sort of remaining events");
// There can be no EventId's to sort, bail. // There can be no EventId's to sort, bail.
if to_sort.is_empty() { if to_sort.is_empty() {
return vec![]; return vec![];

View File

@ -1,7 +1,9 @@
use ruma::{ use ruma::{
events::{ events::{
from_raw_json_value, room::member::MembershipState, AnyStateEvent, AnyStrippedStateEvent, from_raw_json_value,
AnySyncStateEvent, EventDeHelper, EventType, pdu::{Pdu, PduStub, RoomV1Pdu, RoomV1PduStub, RoomV3Pdu, RoomV3PduStub},
room::member::{MemberEventContent, MembershipState},
AnyStateEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventDeHelper, EventType,
}, },
identifiers::{EventId, RoomId}, identifiers::{EventId, RoomId},
}; };
@ -12,71 +14,93 @@ use std::{convert::TryFrom, time::SystemTime};
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
#[serde(untagged)] #[serde(untagged)]
pub enum StateEvent { pub enum StateEvent {
Full(AnyStateEvent), Full(Pdu),
Sync(AnySyncStateEvent), Sync(PduStub),
Stripped(AnyStrippedStateEvent),
} }
impl StateEvent { impl StateEvent {
pub fn is_power_event(&self) -> bool { pub fn is_power_event(&self) -> bool {
match self { match self {
Self::Full(any_event) => match any_event { Self::Full(any_event) => match any_event {
AnyStateEvent::RoomPowerLevels(event) => event.state_key == "", Pdu::RoomV1Pdu(event) => match event.kind {
AnyStateEvent::RoomJoinRules(event) => event.state_key == "", EventType::RoomPowerLevels
AnyStateEvent::RoomCreate(event) => event.state_key == "", | EventType::RoomJoinRules
AnyStateEvent::RoomMember(event) => { | EventType::RoomCreate => event.state_key == Some("".into()),
if [MembershipState::Leave, MembershipState::Ban] EventType::RoomMember => {
.contains(&event.content.membership) if let Ok(content) =
{ // TODO fix clone
return event.sender.as_str() != event.state_key; serde_json::from_value::<MemberEventContent>(event.content.clone())
{
if [MembershipState::Leave, MembershipState::Ban]
.contains(&content.membership)
{
return event.sender.as_str()
// TODO does None here mean the same as state_key = ""
!= event.state_key.as_deref().unwrap_or("");
}
}
false
} }
false _ => false,
} },
_ => false, Pdu::RoomV3Pdu(event) => event.state_key == Some("".into()),
}, },
Self::Sync(any_event) => match any_event { Self::Sync(any_event) => match any_event {
AnySyncStateEvent::RoomPowerLevels(event) => event.state_key == "", PduStub::RoomV1PduStub(event) => match event.kind {
AnySyncStateEvent::RoomJoinRules(event) => event.state_key == "", EventType::RoomPowerLevels
AnySyncStateEvent::RoomCreate(event) => event.state_key == "", | EventType::RoomJoinRules
AnySyncStateEvent::RoomMember(event) => { | EventType::RoomCreate => event.state_key == Some("".into()),
if [MembershipState::Leave, MembershipState::Ban] EventType::RoomMember => {
.contains(&event.content.membership) if let Ok(content) =
{ serde_json::from_value::<MemberEventContent>(event.content.clone())
return event.sender.as_str() != event.state_key; {
if [MembershipState::Leave, MembershipState::Ban]
.contains(&content.membership)
{
return event.sender.as_str()
// TODO does None here mean the same as state_key = ""
!= event.state_key.as_deref().unwrap_or("");
}
}
false
} }
false _ => false,
} },
_ => false, PduStub::RoomV3PduStub(event) => event.state_key == Some("".into()),
}, },
Self::Stripped(any_event) => match any_event {
AnyStrippedStateEvent::RoomPowerLevels(event) => event.state_key == "",
AnyStrippedStateEvent::RoomJoinRules(event) => event.state_key == "",
AnyStrippedStateEvent::RoomCreate(event) => event.state_key == "",
AnyStrippedStateEvent::RoomMember(event) => {
if [MembershipState::Leave, MembershipState::Ban]
.contains(&event.content.membership)
{
return event.sender.as_str() != event.state_key;
}
false
}
_ => false,
},
_ => false,
} }
} }
pub fn origin_server_ts(&self) -> Option<&SystemTime> { pub fn origin_server_ts(&self) -> &SystemTime {
match self { match self {
Self::Full(ev) => Some(ev.origin_server_ts()), Self::Full(ev) => match ev {
Self::Sync(ev) => Some(ev.origin_server_ts()), Pdu::RoomV1Pdu(ev) => &ev.origin_server_ts,
Self::Stripped(ev) => None, Pdu::RoomV3Pdu(ev) => &ev.origin_server_ts,
},
Self::Sync(ev) => match ev {
PduStub::RoomV1PduStub(ev) => &ev.origin_server_ts,
PduStub::RoomV3PduStub(ev) => &ev.origin_server_ts,
},
} }
} }
pub fn event_id(&self) -> Option<&EventId> { pub fn event_id(&self) -> Option<&EventId> {
match self { match self {
Self::Full(ev) => Some(ev.event_id()), Self::Full(ev) => match ev {
Self::Sync(ev) => Some(ev.event_id()), Pdu::RoomV1Pdu(ev) => Some(&ev.event_id),
Self::Stripped(ev) => None, Pdu::RoomV3Pdu(ev) => None,
},
Self::Sync(ev) => None,
}
}
pub fn room_id(&self) -> Option<&RoomId> {
match self {
Self::Full(ev) => match ev {
Pdu::RoomV1Pdu(ev) => Some(&ev.room_id),
Pdu::RoomV3Pdu(ev) => Some(&ev.room_id),
},
Self::Sync(ev) => None,
} }
} }
@ -108,15 +132,13 @@ impl<'de> de::Deserialize<'de> for StateEvent {
} }
_ => StateEvent::Full(from_raw_json_value(&json)?), _ => StateEvent::Full(from_raw_json_value(&json)?),
}) })
} else if event_id.is_some() { } else {
Ok(match unsigned { Ok(match unsigned {
Some(unsigned) if unsigned.redacted_because.is_some() => { Some(unsigned) if unsigned.redacted_because.is_some() => {
panic!("TODO deal with redacted events") panic!("TODO deal with redacted events")
} }
_ => StateEvent::Sync(from_raw_json_value(&json)?), _ => StateEvent::Sync(from_raw_json_value(&json)?),
}) })
} else {
Ok(StateEvent::Stripped(from_raw_json_value(&json)?))
} }
} }
} }

View File

@ -1,20 +0,0 @@
Would it be possible to abstract state res into a `ruma-state-res` crate? I've been thinking about something along the lines of
```rust
// The would need to be Serialize/Deserialize to save state
struct StateResV2 {
resolved_events: Vec<Event>,
state_graph: of indexes into the events field?,
most_recent_resolved: index or ptr into the graph?,
// fields for temp storage during resolution
conflicting_events: Vec<Event>,
}
impl StateResV2 {
/// The point of this all add nonconflicting events to the graph
/// and resolve and add conflicting events.
fn resolve(&mut self, events: Vec<Event>) -> Vec<Event> { }
}
```
Now to be totally fair I have no real understanding of state res

View File

@ -186,7 +186,7 @@ fn it_works() {
let mut resolver = StateResolution::default(); let mut resolver = StateResolution::default();
let res = resolver let res = resolver
.resolve(&room_id, &room_version, vec![initial_state], &mut store) .resolve(&room_id, &room_version, &[initial_state], &mut store)
.unwrap(); .unwrap();
assert!(if let ResolutionResult::Resolved(_) = res { assert!(if let ResolutionResult::Resolved(_) = res {
true true
@ -195,7 +195,7 @@ fn it_works() {
}); });
let resolved = resolver let resolved = resolver
.resolve(&room_id, &room_version, vec![state_to_resolve], &mut store) .resolve(&room_id, &room_version, &[state_to_resolve], &mut store)
.unwrap(); .unwrap();
assert!(resolver.conflicting_events.is_empty()); assert!(resolver.conflicting_events.is_empty());