@@ -329,7 +329,7 @@ where
|
||||
target_user_member_event.as_ref(),
|
||||
sender,
|
||||
sender_member_event.as_ref(),
|
||||
&incoming_event,
|
||||
incoming_event,
|
||||
current_third_party_invite,
|
||||
power_levels_event.as_ref(),
|
||||
join_rules_event.as_ref(),
|
||||
@@ -412,7 +412,7 @@ where
|
||||
|
||||
// If the event type's required power level is greater than the sender's power level, reject
|
||||
// If the event has a state_key that starts with an @ and does not match the sender, reject.
|
||||
if !can_send_event(&incoming_event, power_levels_event.as_ref(), sender_power_level) {
|
||||
if !can_send_event(incoming_event, power_levels_event.as_ref(), sender_power_level) {
|
||||
warn!("user cannot send event");
|
||||
return Ok(false);
|
||||
}
|
||||
@@ -423,7 +423,7 @@ where
|
||||
|
||||
if let Some(required_pwr_lvl) = check_power_levels(
|
||||
room_version,
|
||||
&incoming_event,
|
||||
incoming_event,
|
||||
power_levels_event.as_ref(),
|
||||
sender_power_level,
|
||||
) {
|
||||
|
||||
@@ -62,7 +62,7 @@ pub type TypeStateKey = (StateEventType, String);
|
||||
pub async fn resolve<'a, E, SetIter, Fetch, FetchFut, Exists, ExistsFut>(
|
||||
room_version: &RoomVersionId,
|
||||
state_sets: impl IntoIterator<IntoIter = SetIter> + Send,
|
||||
auth_chain_sets: &'a Vec<HashSet<E::Id>>,
|
||||
auth_chain_sets: &'a [HashSet<E::Id>],
|
||||
event_fetch: &Fetch,
|
||||
event_exists: &Exists,
|
||||
parallel_fetches: usize,
|
||||
@@ -94,7 +94,7 @@ where
|
||||
trace!(map = ?conflicting, "conflicting events");
|
||||
|
||||
let auth_chain_diff =
|
||||
get_auth_chain_diff(&auth_chain_sets).chain(conflicting.into_values().flatten());
|
||||
get_auth_chain_diff(auth_chain_sets).chain(conflicting.into_values().flatten());
|
||||
|
||||
// `all_conflicted` contains unique items
|
||||
// synapse says `full_set = {eid for eid in full_conflicted_set if eid in event_map}`
|
||||
@@ -237,13 +237,13 @@ where
|
||||
}
|
||||
|
||||
/// Returns a Vec of deduped EventIds that appear in some chains but not others.
|
||||
fn get_auth_chain_diff<Id>(auth_chain_sets: &Vec<HashSet<Id>>) -> impl Iterator<Item = Id> + Send
|
||||
fn get_auth_chain_diff<Id>(auth_chain_sets: &[HashSet<Id>]) -> impl Iterator<Item = Id> + Send
|
||||
where
|
||||
Id: Clone + Eq + Hash + Send,
|
||||
{
|
||||
let num_sets = auth_chain_sets.len();
|
||||
let mut id_counts: HashMap<Id, usize> = HashMap::new();
|
||||
for id in auth_chain_sets.into_iter().flatten() {
|
||||
for id in auth_chain_sets.iter().flatten() {
|
||||
*id_counts.entry(id.clone()).or_default() += 1;
|
||||
}
|
||||
|
||||
@@ -449,12 +449,12 @@ where
|
||||
let pl = stream::iter(auth_events)
|
||||
.map(|aid| fetch_event(aid.clone()))
|
||||
.buffer_unordered(parallel_fetches.min(5))
|
||||
.filter_map(|aev| future::ready(aev))
|
||||
.filter_map(future::ready)
|
||||
.collect::<Vec<_>>()
|
||||
.boxed()
|
||||
.await
|
||||
.into_iter()
|
||||
.find(|aev| is_type_and_key(&aev, &TimelineEventType::RoomPowerLevels, ""));
|
||||
.find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, ""));
|
||||
|
||||
let content: PowerLevelsContentFields = match pl {
|
||||
None => return Ok(int!(0)),
|
||||
@@ -514,14 +514,13 @@ where
|
||||
|
||||
let auth_event_ids: HashSet<E::Id> = events_to_check
|
||||
.iter()
|
||||
.map(|event: &E| event.auth_events().map(Clone::clone))
|
||||
.flatten()
|
||||
.flat_map(|event: &E| event.auth_events().map(Clone::clone))
|
||||
.collect();
|
||||
|
||||
let auth_events: HashMap<E::Id, E> = stream::iter(auth_event_ids.into_iter())
|
||||
.map(|event_id| fetch_event(event_id))
|
||||
.map(fetch_event)
|
||||
.buffer_unordered(parallel_fetches)
|
||||
.filter_map(|result| future::ready(result))
|
||||
.filter_map(future::ready)
|
||||
.map(|auth_event| (auth_event.event_id().clone(), auth_event))
|
||||
.collect()
|
||||
.boxed()
|
||||
@@ -544,7 +543,7 @@ where
|
||||
|
||||
let mut auth_state = StateMap::new();
|
||||
for aid in event.auth_events() {
|
||||
if let Some(&ref ev) = auth_events.get(aid.borrow()) {
|
||||
if let Some(ev) = auth_events.get(aid.borrow()) {
|
||||
//TODO: synapse checks "rejected_reason" which is most likely related to
|
||||
// soft-failing
|
||||
auth_state.insert(
|
||||
@@ -558,22 +557,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
stream::iter(
|
||||
auth_types.iter().filter_map(|key| Some((key, resolved_state.get(key)?))).into_iter(),
|
||||
)
|
||||
.filter_map(|(key, ev_id)| async move {
|
||||
if let Some(event) = auth_events.get(ev_id.borrow()) {
|
||||
Some((key, event.clone()))
|
||||
} else {
|
||||
Some((key, fetch_event(ev_id.clone()).await?.clone()))
|
||||
}
|
||||
})
|
||||
.for_each(|(key, event)| {
|
||||
//TODO: synapse checks "rejected_reason" is None here
|
||||
auth_state.insert(key.to_owned(), event);
|
||||
future::ready(())
|
||||
})
|
||||
.await;
|
||||
stream::iter(auth_types.iter().filter_map(|key| Some((key, resolved_state.get(key)?))))
|
||||
.filter_map(|(key, ev_id)| async move {
|
||||
if let Some(event) = auth_events.get(ev_id.borrow()) {
|
||||
Some((key, event.clone()))
|
||||
} else {
|
||||
Some((key, fetch_event(ev_id.clone()).await?.clone()))
|
||||
}
|
||||
})
|
||||
.for_each(|(key, event)| {
|
||||
//TODO: synapse checks "rejected_reason" is None here
|
||||
auth_state.insert(key.to_owned(), event);
|
||||
future::ready(())
|
||||
})
|
||||
.await;
|
||||
|
||||
debug!("event to check {:?}", event.event_id());
|
||||
|
||||
@@ -651,17 +648,17 @@ where
|
||||
.map(|(idx, eid)| ((*eid).clone(), idx))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let order_map = stream::iter(to_sort.into_iter())
|
||||
let order_map = stream::iter(to_sort.iter())
|
||||
.map(|ev_id| fetch_event(ev_id.clone()).map(move |event| event.map(|event| (event, ev_id))))
|
||||
.buffer_unordered(parallel_fetches)
|
||||
.filter_map(|result| future::ready(result))
|
||||
.filter_map(future::ready)
|
||||
.map(|(event, ev_id)| {
|
||||
get_mainline_depth(Some(event.clone()), &mainline_map, fetch_event)
|
||||
.map_ok(move |depth| (depth, event, ev_id))
|
||||
.map(Result::ok)
|
||||
})
|
||||
.buffer_unordered(parallel_fetches)
|
||||
.filter_map(|result| future::ready(result))
|
||||
.filter_map(future::ready)
|
||||
.fold(HashMap::new(), |mut order_map, (depth, event, ev_id)| {
|
||||
order_map.insert(ev_id, (depth, event.origin_server_ts(), ev_id));
|
||||
future::ready(order_map)
|
||||
|
||||
Reference in New Issue
Block a user