Update to std::future::Futures, feature(async_await)

This commit is contained in:
Jonas Platte 2019-07-20 01:50:25 +02:00
parent b115a047ed
commit ef380003ff
4 changed files with 178 additions and 696 deletions

View File

@ -13,16 +13,17 @@ repository = "https://github.com/ruma/ruma-client"
version = "0.2.0"
[dependencies]
futures = "0.1.26"
futures-preview = "0.3.0-alpha.17"
http = "0.1.17"
hyper = "0.12.27"
ruma-api = "0.7.0"
ruma-client-api = "0.3.0"
ruma-identifiers = "0.12.0"
ruma-api = "0.9.0"
ruma-identifiers = "0.13.1"
serde_json = "1.0.39"
serde_urlencoded = "0.5.4"
url = "1.7.2"
[dependencies.hyper]
git = "https://github.com/hyperium/hyper"
[dependencies.hyper-tls]
optional = true
version = "0.3.2"
@ -31,14 +32,20 @@ version = "0.3.2"
optional = true
version = "0.2.2"
[dependencies.ruma-client-api]
git = "https://github.com/ruma/ruma-client-api"
branch = "update-deps"
[dependencies.serde]
version = "1.0.90"
features = ["derive"]
[dev-dependencies]
ruma-events = "0.12.0"
tokio = "0.1.18"
[dev-dependencies.ruma-events]
git = "https://github.com/ruma/ruma-events"
[dev-dependencies.tokio]
git = "https://github.com/tokio-rs/tokio"
[features]
default = ["tls"]
default = []
tls = ["hyper-tls", "native-tls"]

View File

@ -1,7 +1,9 @@
#![feature(async_await)]
use std::{convert::TryFrom, env, process::exit};
use futures::Future;
use ruma_client::{self, api::r0, Client};
use ruma_client::{self, Client};
use ruma_client_api::r0;
use ruma_events::{
room::message::{MessageEventContent, MessageType, TextMessageEventContent},
EventType,
@ -9,49 +11,47 @@ use ruma_events::{
use ruma_identifiers::RoomAliasId;
use url::Url;
// from https://stackoverflow.com/a/43992218/1592377
macro_rules! clone {
(@param _) => ( _ );
(@param $x:ident) => ( $x );
($($n:ident),+ => move |$($p:tt),+| $body:expr) => (
{
$( let $n = $n.clone(); )+
move |$(clone!(@param $p),)+| $body
}
);
}
async fn hello_world(homeserver_url: Url, room: String) -> Result<(), ruma_client::Error> {
let client = Client::new(homeserver_url, None);
fn hello_world(
homeserver_url: Url,
room: String,
) -> impl Future<Item = (), Error = ruma_client::Error> {
let client = Client::https(homeserver_url, None).unwrap();
client.register_guest().and_then(clone!(client => move |_| {
r0::alias::get_alias::call(client, r0::alias::get_alias::Request {
client.register_guest().await?;
let response = client
.request::<r0::alias::get_alias::Endpoint>(r0::alias::get_alias::Request {
room_alias: RoomAliasId::try_from(&room[..]).unwrap(),
})
})).and_then(clone!(client => move |response| {
let room_id = response.room_id;
.await?;
r0::membership::join_room_by_id::call(client.clone(), r0::membership::join_room_by_id::Request {
room_id: room_id.clone(),
third_party_signed: None,
}).and_then(move |_| {
r0::send::send_message_event::call(client, r0::send::send_message_event::Request {
room_id: room_id,
event_type: EventType::RoomMessage,
txn_id: "1".to_owned(),
data: MessageEventContent::Text(TextMessageEventContent {
body: "Hello World!".to_owned(),
msgtype: MessageType::Text,
}),
})
})
})).map(|_| ())
let room_id = response.room_id;
client
.request::<r0::membership::join_room_by_id::Endpoint>(
r0::membership::join_room_by_id::Request {
room_id: room_id.clone(),
third_party_signed: None,
},
)
.await?;
client.request::<r0::send::send_message_event::Endpoint>(
r0::send::send_message_event::Request {
room_id: room_id,
event_type: EventType::RoomMessage,
txn_id: "1".to_owned(),
data: MessageEventContent::Text(TextMessageEventContent {
msgtype: MessageType::Text,
body: "Hello World!".to_owned(),
format: None,
formatted_body: None,
relates_to: None,
}),
},
).await?;
Ok(())
}
fn main() {
#[tokio::main]
async fn main() -> Result<(), ruma_client::Error> {
let (homeserver_url, room) = match (env::args().nth(1), env::args().nth(2)) {
(Some(a), Some(b)) => (a, b),
_ => {
@ -63,9 +63,5 @@ fn main() {
}
};
tokio::run(
hello_world(homeserver_url.parse().unwrap(), room).map_err(|e| {
dbg!(e);
}),
);
hello_world(homeserver_url.parse().unwrap(), room).await
}

View File

@ -1,528 +0,0 @@
macro_rules! endpoint {
// No reexports besides `Request` and `Response`.
($(#[$attr:meta])+ [$($outer_mod:ident),*], $inner_mod:ident) => {
endpoint!($(#[$attr])+ [$($outer_mod),*], $inner_mod, []);
};
// No imports from super.
($(#[$attr:meta])+ [$($outer_mod:ident),*], $inner_mod:ident, [$($import:ident),*]) => {
endpoint!($(#[$attr])+ [$($outer_mod),*], $inner_mod, [$($import),*], []);
};
// Explicit case.
(
$(#[$attr:meta])+
[$($outer_mod:ident),*],
$inner_mod:ident,
[$($import:ident),*],
[$($super_import:ident),*]
) => {
#[$($attr)+]
pub mod $inner_mod {
use futures::Future;
use hyper::client::connect::Connect;
use ruma_client_api::$($outer_mod::)*$inner_mod::Endpoint;
$(use super::$super_import;)*
pub use ruma_client_api::$($outer_mod::)*$inner_mod::{
Request,
Response,
$($import),*
};
use crate::{Client, Error};
/// Make a request to this API endpoint.
pub fn call<C>(
client: Client<C>,
request: Request,
) -> impl Future<Item = Response, Error = Error>
where
C: Connect + 'static,
{
client.request::<Endpoint>(request)
}
}
};
}
/// Endpoints for the r0.x.x versions of the client API specification.
pub mod r0 {
/// Account registration and management.
pub mod account {
endpoint!(
/// Change the password for an account on this homeserver.
[r0, account],
change_password
);
endpoint!(
/// Deactivate the user's account, removing all ability for the user to log in again.
[r0, account],
deactivate
);
endpoint!(
/// Register for an account on this homeserver.
[r0, account],
register,
[AuthenticationData, RegistrationKind]
);
endpoint!(
/// Request a password change token by email.
[r0, account],
request_password_change_token
);
endpoint!(
/// Request an account registration token by email.
[r0, account],
request_register_token
);
}
/// Room aliases.
pub mod alias {
endpoint!(
/// Create a new mapping from a room alias to a room ID.
[r0, alias],
create_alias
);
endpoint!(
/// Remove a mapping from a room alias to a room ID.
[r0, alias],
delete_alias
);
endpoint!(
/// Resolve a room alias to the corresponding room ID.
[r0, alias],
get_alias
);
}
/// Client configuration.
pub mod config {
endpoint!(
/// Set account data for the user.
[r0, config],
set_global_account_data
);
endpoint!(
/// Set account data scoped to a room for the user.
[r0, config],
set_room_account_data
);
}
/// Account contact information.
pub mod contact {
endpoint!(
/// Add contact information to the user's account.
[r0, contact],
create_contact,
[ThreePidCredentials]
);
endpoint!(
/// Get a list of the third party identifiers that the homeserver has associated with the user's account.
[r0, contact],
get_contacts,
[Medium, ThirdPartyIdentifier]
);
endpoint!(
/// Request an email address verification token by email.
[r0, contact],
request_contact_verification_token
);
}
/// Event context.
pub mod context {
endpoint!(
/// Get a number of events that happened just before and after a given event.
[r0, context],
get_context
);
}
/// The public room directory.
pub mod directory {
endpoint!(
/// Get a number of events that happened just before and after a given event.
[r0, directory],
get_public_rooms,
[PublicRoomsChunk]
);
}
/// Event filters.
pub mod filter {
pub use ruma_client_api::r0::filter::{
EventFormat, Filter, FilterDefinition, RoomEventFilter, RoomFilter,
};
endpoint!(
/// Create a new filter.
[r0, filter],
create_filter
);
endpoint!(
/// Get a filter.
[r0, filter],
get_filter
);
}
/// Media repository.
pub mod media {
endpoint!(
/// Upload media to the media repository.
[r0, media],
create_content
);
endpoint!(
/// Download media from the media repository.
[r0, media],
get_content
);
endpoint!(
/// Download a thumbnail image for the media in the media repository.
[r0, media],
get_content_thumbnail,
[Method]
);
}
/// Room membership.
pub mod membership {
pub use ruma_client_api::r0::membership::ThirdPartySigned;
endpoint!(
/// Ban a user from a room.
[r0, membership],
ban_user
);
endpoint!(
/// Permanently forget a room.
[r0, membership],
forget_room
);
endpoint!(
/// Invite a user to a room.
[r0, membership],
invite_user
);
endpoint!(
/// Join a room using its ID.
[r0, membership],
join_room_by_id
);
endpoint!(
/// Join a room using its ID or an alias.
[r0, membership],
join_room_by_id_or_alias
);
endpoint!(
/// Get a list of the user's current rooms.
[r0, membership],
joined_rooms
);
endpoint!(
/// Kick a user from a room.
[r0, membership],
kick_user
);
endpoint!(
/// Leave a room.
[r0, membership],
leave_room
);
endpoint!(
/// Unban a user from a room.
[r0, membership],
unban_user
);
}
/// User presence.
pub mod presence {
endpoint!(
/// Get a user's presence state.
[r0, presence],
get_presence
);
endpoint!(
/// Get a list of presence events for users on the presence subscription list.
[r0, presence],
get_subscribed_presences
);
endpoint!(
/// Set a user's presence state.
[r0, presence],
set_presence
);
endpoint!(
/// Add or remove users from the presence subscription list.
[r0, presence],
update_presence_subscriptions
);
}
/// User profiles.
pub mod profile {
endpoint!(
/// Get the URL for a user's avatar.
[r0, profile],
get_avatar_url
);
endpoint!(
/// Get a user's display name.
[r0, profile],
get_display_name
);
endpoint!(
/// Get a user's full profile.
[r0, profile],
get_profile
);
endpoint!(
/// Set the URL to the user's avatar.
[r0, profile],
set_avatar_url
);
endpoint!(
/// Set the user's display name.
[r0, profile],
set_display_name
);
}
/// Push notifications.
pub mod push {}
/// Event receipts.
pub mod receipt {
endpoint!(
/// Update a receipt marker to point to a given event.
[r0, receipt],
create_receipt,
[ReceiptType]
);
}
/// Event redaction.
pub mod redact {
endpoint!(
/// Redact an event from a room.
[r0, redact],
redact_event
);
}
/// Room creation.
pub mod room {
endpoint!(
/// Create a room.
[r0, room],
create_room,
[CreationContent, RoomPreset, Visibility]
);
}
/// Event searches.
pub mod search {
endpoint!(
/// Search for events.
[r0, search],
search_events,
[
Categories,
Criteria,
EventContext,
EventContextResult,
Grouping,
Groupings,
ResultCategories,
ResultGroup,
RoomEventResults,
SearchResult,
UserProfile,
GroupingKey,
OrderBy,
SearchKeys
]
);
}
/// Sending events.
pub mod send {
endpoint!(
/// Send a message to a room.
[r0, send],
send_message_event
);
endpoint!(
/// Send a state event with an empty state key.
[r0, send],
send_state_event_for_empty_key
);
endpoint!(
/// Send a state event with a particular state key.
[r0, send],
send_state_event_for_key
);
}
/// Server administration.
pub mod server {
endpoint!(
/// Get administrative information about a user.
[r0, server],
get_user_info,
[ConnectionInfo, DeviceInfo, SessionInfo]
);
}
/// User session management.
pub mod session {
endpoint!(
/// Log in to an account, creating an access token.
[r0, session],
login,
[LoginType, Medium]
);
endpoint!(
/// Log out of an account by invalidating the access token.
[r0, session],
logout
);
}
/// Getting and synchronizing events.
pub mod sync {
endpoint!(
/// Get the list of members for a room.
[r0, sync],
get_member_events
);
endpoint!(
/// Get message and state events for a room.
[r0, sync],
get_message_events,
[Direction]
);
endpoint!(
/// Get the state events for the current state of a room.
[r0, sync],
get_state_events
);
endpoint!(
/// Get a particular state event with an empty state key for a room.
[r0, sync],
get_state_events_for_empty_key
);
endpoint!(
/// Get a particular state event with a particular state key for a room.
[r0, sync],
get_state_events_for_key
);
endpoint!(
/// Synchronize the client's state with the latest state on the homeserver.
[r0, sync],
sync_events,
[
AccountData,
Ephemeral,
Filter,
InviteState,
InvitedRoom,
JoinedRoom,
LeftRoom,
Presence,
Rooms,
SetPresence,
State,
Timeline,
UnreadNotificationsCount
]
);
}
/// Tagging rooms.
pub mod tag {
endpoint!(
/// Create a tag on a room.
[r0, tag],
create_tag
);
endpoint!(
/// Delete a tag on a room.
[r0, tag],
delete_tag
);
endpoint!(
/// Get the user's tags for a room.
[r0, tag],
get_tags
);
}
/// Typing notifications.
pub mod typing {
endpoint!(
/// Indicate that the user is currently typing.
[r0, typing],
create_typing_event
);
}
/// Voice over IP.
pub mod voip {
endpoint!(
/// Get credentials for initiating voice over IP calls via a TURN server.
[r0, voip],
get_turn_server_info
);
}
}
/// Endpoints that cannot change with new versions of the Matrix specification.
pub mod unversioned {
endpoint!(
/// Get the versions of the specification supported by this homeserver.
[unversioned],
get_supported_versions
);
}

View File

@ -75,11 +75,12 @@
//! // Start `work` on a futures runtime...
//! ```
#![feature(async_await, async_closure)]
#![deny(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
warnings
//warnings
)]
#![warn(
clippy::empty_line_after_outer_attr,
@ -101,15 +102,16 @@
)]
use std::{
convert::TryInto,
convert::{TryFrom, TryInto},
str::FromStr,
sync::{Arc, Mutex},
};
use futures::{
future::{Future, FutureFrom, IntoFuture},
stream::{self, Stream},
future::Future,
stream::{self, TryStream, TryStreamExt as _},
};
use http::Response as HttpResponse;
use hyper::{
client::{connect::Connect, HttpConnector},
Client as HyperClient, Uri,
@ -125,7 +127,7 @@ use crate::error::InnerError;
pub use crate::{error::Error, session::Session};
/// Matrix client-server API endpoints.
pub mod api;
//pub mod api;
mod error;
mod session;
@ -177,7 +179,7 @@ impl Client<HttpsConnector<HttpConnector>> {
Ok(Self(Arc::new(ClientData {
homeserver_url,
hyper: { HyperClient::builder().keep_alive(true).build(connector) },
hyper: HyperClient::builder().keep_alive(true).build(connector),
session: Mutex::new(session),
})))
}
@ -207,50 +209,43 @@ where
/// In contrast to api::r0::session::login::call(), this method stores the
/// session data returned by the endpoint in this client, instead of
/// returning it.
pub fn log_in(
pub async fn log_in(
&self,
user: String,
password: String,
device_id: Option<String>,
) -> impl Future<Item = Session, Error = Error> {
use crate::api::r0::session::login;
) -> Result<Session, Error> {
use ruma_client_api::r0::session::login;
let data = self.0.clone();
login::call(
self.clone(),
login::Request {
let response = self
.request::<login::Endpoint>(login::Request {
address: None,
login_type: login::LoginType::Password,
medium: None,
device_id,
password,
user,
},
)
.map(move |response| {
let session = Session {
access_token: response.access_token,
device_id: response.device_id,
user_id: response.user_id,
};
*data.session.lock().unwrap() = Some(session.clone());
})
.await?;
session
})
let session = Session {
access_token: response.access_token,
device_id: response.device_id,
user_id: response.user_id,
};
*self.0.session.lock().unwrap() = Some(session.clone());
Ok(session)
}
/// Register as a guest. In contrast to api::r0::account::register::call(),
/// this method stores the session data returned by the endpoint in this
/// client, instead of returning it.
pub fn register_guest(&self) -> impl Future<Item = Session, Error = Error> {
use crate::api::r0::account::register;
pub async fn register_guest(&self) -> Result<Session, Error> {
use ruma_client_api::r0::account::register;
let data = self.0.clone();
register::call(
self.clone(),
register::Request {
let response = self
.request::<register::Endpoint>(register::Request {
auth: None,
bind_email: None,
device_id: None,
@ -258,18 +253,17 @@ where
kind: Some(register::RegistrationKind::Guest),
password: None,
username: None,
},
)
.map(move |response| {
let session = Session {
access_token: response.access_token,
device_id: response.device_id,
user_id: response.user_id,
};
*data.session.lock().unwrap() = Some(session.clone());
})
.await?;
session
})
let session = Session {
access_token: response.access_token,
device_id: response.device_id,
user_id: response.user_id,
};
*self.0.session.lock().unwrap() = Some(session.clone());
Ok(session)
}
/// Register as a new user on this server.
@ -280,18 +274,15 @@ where
///
/// The username is the local part of the returned user_id. If it is
/// omitted from this request, the server will generate one.
pub fn register_user(
pub async fn register_user(
&self,
username: Option<String>,
password: String,
) -> impl Future<Item = Session, Error = Error> {
use crate::api::r0::account::register;
) -> Result<Session, Error> {
use ruma_client_api::r0::account::register;
let data = self.0.clone();
register::call(
self.clone(),
register::Request {
let response = self
.request::<register::Endpoint>(register::Request {
auth: None,
bind_email: None,
device_id: None,
@ -299,18 +290,17 @@ where
kind: Some(register::RegistrationKind::User),
password: Some(password),
username,
},
)
.map(move |response| {
let session = Session {
access_token: response.access_token,
device_id: response.device_id,
user_id: response.user_id,
};
*data.session.lock().unwrap() = Some(session.clone());
})
.await?;
session
})
let session = Session {
access_token: response.access_token,
device_id: response.device_id,
user_id: response.user_id,
};
*self.0.session.lock().unwrap() = Some(session.clone());
Ok(session)
}
/// Convenience method that represents repeated calls to the sync_events endpoint as a stream.
@ -320,11 +310,19 @@ where
/// the logged-in users account and are visible to them.
pub fn sync(
&self,
filter: Option<api::r0::sync::sync_events::Filter>,
filter: Option<ruma_client_api::r0::sync::sync_events::Filter>,
since: Option<String>,
set_presence: bool,
) -> impl Stream<Item = api::r0::sync::sync_events::Response, Error = Error> {
use crate::api::r0::sync::sync_events;
) -> impl TryStream<Ok = ruma_client_api::r0::sync::sync_events::Response, Error = Error> {
use ruma_client_api::r0::sync::sync_events;
// TODO: Is this really the way TryStreams are supposed to work?
#[derive(Debug, PartialEq, Eq)]
enum State {
InitialSync,
Since(String),
Errored,
}
let client = self.clone();
let set_presence = if set_presence {
@ -333,71 +331,80 @@ where
Some(sync_events::SetPresence::Offline)
};
stream::unfold(since, move |since| {
Some(
sync_events::call(
client.clone(),
sync_events::Request {
filter: filter.clone(),
let initial_state = match since {
Some(s) => State::Since(s),
None => State::InitialSync,
};
stream::unfold(initial_state, move |state| {
let client = client.clone();
let filter = filter.clone();
async move {
let since = match state {
State::Errored => return None,
State::Since(s) => Some(s),
State::InitialSync => None,
};
let res = client
.request::<sync_events::Endpoint>(sync_events::Request {
filter,
since,
full_state: None,
set_presence: set_presence.clone(),
timeout: None,
},
)
.map(|res| {
let next_batch_clone = res.next_batch.clone();
(res, Some(next_batch_clone))
}),
)
})
.await;
match res {
Ok(response) => {
let next_batch_clone = response.next_batch.clone();
Some((Ok(response), State::Since(next_batch_clone)))
}
Err(e) => Some((Err(e.into()), State::Errored)),
}
}
})
}
/// Makes a request to a Matrix API endpoint.
pub(crate) fn request<E>(
self,
request: <E as Endpoint>::Request,
) -> impl Future<Item = E::Response, Error = Error>
where
E: Endpoint,
{
let data1 = self.0.clone();
let data2 = self.0.clone();
let mut url = self.0.homeserver_url.clone();
pub fn request<E: Endpoint>(
&self,
request: E::Request,
) -> impl Future<Output = Result<E::Response, Error>> {
let client = self.0.clone();
request
.try_into()
.map_err(Error::from)
.into_future()
.and_then(move |hyper_request| {
{
let uri = hyper_request.uri();
async move {
let mut url = client.homeserver_url.clone();
url.set_path(uri.path());
url.set_query(uri.query());
let mut hyper_request = request.try_into()?.map(hyper::Body::from);
if E::METADATA.requires_authentication {
if let Some(ref session) = *data1.session.lock().unwrap() {
url.query_pairs_mut()
.append_pair("access_token", &session.access_token);
} else {
return Err(Error(InnerError::AuthenticationRequired));
}
{
let uri = hyper_request.uri();
url.set_path(uri.path());
url.set_query(uri.query());
if E::METADATA.requires_authentication {
if let Some(ref session) = *client.session.lock().unwrap() {
url.query_pairs_mut()
.append_pair("access_token", &session.access_token);
} else {
return Err(Error(InnerError::AuthenticationRequired));
}
}
}
Uri::from_str(url.as_ref())
.map(move |uri| (uri, hyper_request))
.map_err(Error::from)
})
.and_then(move |(uri, mut hyper_request)| {
*hyper_request.uri_mut() = uri;
*hyper_request.uri_mut() = Uri::from_str(url.as_ref())?;
data2.hyper.request(hyper_request).map_err(Error::from)
})
.and_then(|hyper_response| {
E::Response::future_from(hyper_response).map_err(Error::from)
})
let hyper_response = client.hyper.request(hyper_request).await?;
let (head, body) = hyper_response.into_parts();
let full_response =
HttpResponse::from_parts(head, body.try_concat().await?.as_ref().to_owned());
Ok(E::Response::try_from(full_response)?)
}
}
}