diff --git a/Cargo.toml b/Cargo.toml index 698483bc..58bb3266 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,16 +13,17 @@ repository = "https://github.com/ruma/ruma-client" version = "0.2.0" [dependencies] -futures = "0.1.26" +futures-preview = "0.3.0-alpha.17" http = "0.1.17" -hyper = "0.12.27" -ruma-api = "0.7.0" -ruma-client-api = "0.3.0" -ruma-identifiers = "0.12.0" +ruma-api = "0.9.0" +ruma-identifiers = "0.13.1" serde_json = "1.0.39" serde_urlencoded = "0.5.4" url = "1.7.2" +[dependencies.hyper] +git = "https://github.com/hyperium/hyper" + [dependencies.hyper-tls] optional = true version = "0.3.2" @@ -31,14 +32,20 @@ version = "0.3.2" optional = true version = "0.2.2" +[dependencies.ruma-client-api] +git = "https://github.com/ruma/ruma-client-api" +branch = "update-deps" + [dependencies.serde] version = "1.0.90" features = ["derive"] -[dev-dependencies] -ruma-events = "0.12.0" -tokio = "0.1.18" +[dev-dependencies.ruma-events] +git = "https://github.com/ruma/ruma-events" + +[dev-dependencies.tokio] +git = "https://github.com/tokio-rs/tokio" [features] -default = ["tls"] +default = [] tls = ["hyper-tls", "native-tls"] diff --git a/examples/hello_world.rs b/examples/hello_world.rs index 227af951..e2e9ec7a 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -1,7 +1,9 @@ +#![feature(async_await)] + use std::{convert::TryFrom, env, process::exit}; -use futures::Future; -use ruma_client::{self, api::r0, Client}; +use ruma_client::{self, Client}; +use ruma_client_api::r0; use ruma_events::{ room::message::{MessageEventContent, MessageType, TextMessageEventContent}, EventType, @@ -9,49 +11,47 @@ use ruma_events::{ use ruma_identifiers::RoomAliasId; use url::Url; -// from https://stackoverflow.com/a/43992218/1592377 -macro_rules! clone { - (@param _) => ( _ ); - (@param $x:ident) => ( $x ); - ($($n:ident),+ => move |$($p:tt),+| $body:expr) => ( - { - $( let $n = $n.clone(); )+ - move |$(clone!(@param $p),)+| $body - } - ); -} +async fn hello_world(homeserver_url: Url, room: String) -> Result<(), ruma_client::Error> { + let client = Client::new(homeserver_url, None); -fn hello_world( - homeserver_url: Url, - room: String, -) -> impl Future { - let client = Client::https(homeserver_url, None).unwrap(); - - client.register_guest().and_then(clone!(client => move |_| { - r0::alias::get_alias::call(client, r0::alias::get_alias::Request { + client.register_guest().await?; + let response = client + .request::(r0::alias::get_alias::Request { room_alias: RoomAliasId::try_from(&room[..]).unwrap(), }) - })).and_then(clone!(client => move |response| { - let room_id = response.room_id; + .await?; - r0::membership::join_room_by_id::call(client.clone(), r0::membership::join_room_by_id::Request { - room_id: room_id.clone(), - third_party_signed: None, - }).and_then(move |_| { - r0::send::send_message_event::call(client, r0::send::send_message_event::Request { - room_id: room_id, - event_type: EventType::RoomMessage, - txn_id: "1".to_owned(), - data: MessageEventContent::Text(TextMessageEventContent { - body: "Hello World!".to_owned(), - msgtype: MessageType::Text, - }), - }) - }) - })).map(|_| ()) + let room_id = response.room_id; + + client + .request::( + r0::membership::join_room_by_id::Request { + room_id: room_id.clone(), + third_party_signed: None, + }, + ) + .await?; + + client.request::( + r0::send::send_message_event::Request { + room_id: room_id, + event_type: EventType::RoomMessage, + txn_id: "1".to_owned(), + data: MessageEventContent::Text(TextMessageEventContent { + msgtype: MessageType::Text, + body: "Hello World!".to_owned(), + format: None, + formatted_body: None, + relates_to: None, + }), + }, + ).await?; + + Ok(()) } -fn main() { +#[tokio::main] +async fn main() -> Result<(), ruma_client::Error> { let (homeserver_url, room) = match (env::args().nth(1), env::args().nth(2)) { (Some(a), Some(b)) => (a, b), _ => { @@ -63,9 +63,5 @@ fn main() { } }; - tokio::run( - hello_world(homeserver_url.parse().unwrap(), room).map_err(|e| { - dbg!(e); - }), - ); + hello_world(homeserver_url.parse().unwrap(), room).await } diff --git a/src/api.rs b/src/api.rs deleted file mode 100644 index 13bf95fe..00000000 --- a/src/api.rs +++ /dev/null @@ -1,528 +0,0 @@ -macro_rules! endpoint { - // No reexports besides `Request` and `Response`. - ($(#[$attr:meta])+ [$($outer_mod:ident),*], $inner_mod:ident) => { - endpoint!($(#[$attr])+ [$($outer_mod),*], $inner_mod, []); - }; - - // No imports from super. - ($(#[$attr:meta])+ [$($outer_mod:ident),*], $inner_mod:ident, [$($import:ident),*]) => { - endpoint!($(#[$attr])+ [$($outer_mod),*], $inner_mod, [$($import),*], []); - }; - - // Explicit case. - ( - $(#[$attr:meta])+ - [$($outer_mod:ident),*], - $inner_mod:ident, - [$($import:ident),*], - [$($super_import:ident),*] - ) => { - #[$($attr)+] - pub mod $inner_mod { - use futures::Future; - use hyper::client::connect::Connect; - use ruma_client_api::$($outer_mod::)*$inner_mod::Endpoint; - $(use super::$super_import;)* - pub use ruma_client_api::$($outer_mod::)*$inner_mod::{ - Request, - Response, - $($import),* - }; - - use crate::{Client, Error}; - - /// Make a request to this API endpoint. - pub fn call( - client: Client, - request: Request, - ) -> impl Future - where - C: Connect + 'static, - { - client.request::(request) - } - } - }; -} - -/// Endpoints for the r0.x.x versions of the client API specification. -pub mod r0 { - /// Account registration and management. - pub mod account { - endpoint!( - /// Change the password for an account on this homeserver. - [r0, account], - change_password - ); - - endpoint!( - /// Deactivate the user's account, removing all ability for the user to log in again. - [r0, account], - deactivate - ); - - endpoint!( - /// Register for an account on this homeserver. - [r0, account], - register, - [AuthenticationData, RegistrationKind] - ); - - endpoint!( - /// Request a password change token by email. - [r0, account], - request_password_change_token - ); - - endpoint!( - /// Request an account registration token by email. - [r0, account], - request_register_token - ); - } - - /// Room aliases. - pub mod alias { - endpoint!( - /// Create a new mapping from a room alias to a room ID. - [r0, alias], - create_alias - ); - - endpoint!( - /// Remove a mapping from a room alias to a room ID. - [r0, alias], - delete_alias - ); - - endpoint!( - /// Resolve a room alias to the corresponding room ID. - [r0, alias], - get_alias - ); - } - - /// Client configuration. - pub mod config { - endpoint!( - /// Set account data for the user. - [r0, config], - set_global_account_data - ); - - endpoint!( - /// Set account data scoped to a room for the user. - [r0, config], - set_room_account_data - ); - } - - /// Account contact information. - pub mod contact { - endpoint!( - /// Add contact information to the user's account. - [r0, contact], - create_contact, - [ThreePidCredentials] - ); - - endpoint!( - /// Get a list of the third party identifiers that the homeserver has associated with the user's account. - [r0, contact], - get_contacts, - [Medium, ThirdPartyIdentifier] - ); - - endpoint!( - /// Request an email address verification token by email. - [r0, contact], - request_contact_verification_token - ); - } - - /// Event context. - pub mod context { - endpoint!( - /// Get a number of events that happened just before and after a given event. - [r0, context], - get_context - ); - } - - /// The public room directory. - pub mod directory { - endpoint!( - /// Get a number of events that happened just before and after a given event. - [r0, directory], - get_public_rooms, - [PublicRoomsChunk] - ); - } - - /// Event filters. - pub mod filter { - pub use ruma_client_api::r0::filter::{ - EventFormat, Filter, FilterDefinition, RoomEventFilter, RoomFilter, - }; - - endpoint!( - /// Create a new filter. - [r0, filter], - create_filter - ); - - endpoint!( - /// Get a filter. - [r0, filter], - get_filter - ); - } - - /// Media repository. - pub mod media { - endpoint!( - /// Upload media to the media repository. - [r0, media], - create_content - ); - - endpoint!( - /// Download media from the media repository. - [r0, media], - get_content - ); - - endpoint!( - /// Download a thumbnail image for the media in the media repository. - [r0, media], - get_content_thumbnail, - [Method] - ); - } - - /// Room membership. - pub mod membership { - pub use ruma_client_api::r0::membership::ThirdPartySigned; - - endpoint!( - /// Ban a user from a room. - [r0, membership], - ban_user - ); - - endpoint!( - /// Permanently forget a room. - [r0, membership], - forget_room - ); - - endpoint!( - /// Invite a user to a room. - [r0, membership], - invite_user - ); - - endpoint!( - /// Join a room using its ID. - [r0, membership], - join_room_by_id - ); - - endpoint!( - /// Join a room using its ID or an alias. - [r0, membership], - join_room_by_id_or_alias - ); - - endpoint!( - /// Get a list of the user's current rooms. - [r0, membership], - joined_rooms - ); - - endpoint!( - /// Kick a user from a room. - [r0, membership], - kick_user - ); - - endpoint!( - /// Leave a room. - [r0, membership], - leave_room - ); - - endpoint!( - /// Unban a user from a room. - [r0, membership], - unban_user - ); - } - - /// User presence. - pub mod presence { - endpoint!( - /// Get a user's presence state. - [r0, presence], - get_presence - ); - - endpoint!( - /// Get a list of presence events for users on the presence subscription list. - [r0, presence], - get_subscribed_presences - ); - - endpoint!( - /// Set a user's presence state. - [r0, presence], - set_presence - ); - - endpoint!( - /// Add or remove users from the presence subscription list. - [r0, presence], - update_presence_subscriptions - ); - } - - /// User profiles. - pub mod profile { - endpoint!( - /// Get the URL for a user's avatar. - [r0, profile], - get_avatar_url - ); - - endpoint!( - /// Get a user's display name. - [r0, profile], - get_display_name - ); - - endpoint!( - /// Get a user's full profile. - [r0, profile], - get_profile - ); - - endpoint!( - /// Set the URL to the user's avatar. - [r0, profile], - set_avatar_url - ); - - endpoint!( - /// Set the user's display name. - [r0, profile], - set_display_name - ); - } - - /// Push notifications. - pub mod push {} - - /// Event receipts. - pub mod receipt { - endpoint!( - /// Update a receipt marker to point to a given event. - [r0, receipt], - create_receipt, - [ReceiptType] - ); - } - - /// Event redaction. - pub mod redact { - endpoint!( - /// Redact an event from a room. - [r0, redact], - redact_event - ); - } - - /// Room creation. - pub mod room { - endpoint!( - /// Create a room. - [r0, room], - create_room, - [CreationContent, RoomPreset, Visibility] - ); - } - - /// Event searches. - pub mod search { - endpoint!( - /// Search for events. - [r0, search], - search_events, - [ - Categories, - Criteria, - EventContext, - EventContextResult, - Grouping, - Groupings, - ResultCategories, - ResultGroup, - RoomEventResults, - SearchResult, - UserProfile, - GroupingKey, - OrderBy, - SearchKeys - ] - ); - } - - /// Sending events. - pub mod send { - endpoint!( - /// Send a message to a room. - [r0, send], - send_message_event - ); - - endpoint!( - /// Send a state event with an empty state key. - [r0, send], - send_state_event_for_empty_key - ); - - endpoint!( - /// Send a state event with a particular state key. - [r0, send], - send_state_event_for_key - ); - } - - /// Server administration. - pub mod server { - endpoint!( - /// Get administrative information about a user. - [r0, server], - get_user_info, - [ConnectionInfo, DeviceInfo, SessionInfo] - ); - } - - /// User session management. - pub mod session { - endpoint!( - /// Log in to an account, creating an access token. - [r0, session], - login, - [LoginType, Medium] - ); - - endpoint!( - /// Log out of an account by invalidating the access token. - [r0, session], - logout - ); - } - - /// Getting and synchronizing events. - pub mod sync { - endpoint!( - /// Get the list of members for a room. - [r0, sync], - get_member_events - ); - - endpoint!( - /// Get message and state events for a room. - [r0, sync], - get_message_events, - [Direction] - ); - - endpoint!( - /// Get the state events for the current state of a room. - [r0, sync], - get_state_events - ); - - endpoint!( - /// Get a particular state event with an empty state key for a room. - [r0, sync], - get_state_events_for_empty_key - ); - - endpoint!( - /// Get a particular state event with a particular state key for a room. - [r0, sync], - get_state_events_for_key - ); - - endpoint!( - /// Synchronize the client's state with the latest state on the homeserver. - [r0, sync], - sync_events, - [ - AccountData, - Ephemeral, - Filter, - InviteState, - InvitedRoom, - JoinedRoom, - LeftRoom, - Presence, - Rooms, - SetPresence, - State, - Timeline, - UnreadNotificationsCount - ] - ); - } - - /// Tagging rooms. - pub mod tag { - endpoint!( - /// Create a tag on a room. - [r0, tag], - create_tag - ); - - endpoint!( - /// Delete a tag on a room. - [r0, tag], - delete_tag - ); - - endpoint!( - /// Get the user's tags for a room. - [r0, tag], - get_tags - ); - } - - /// Typing notifications. - pub mod typing { - endpoint!( - /// Indicate that the user is currently typing. - [r0, typing], - create_typing_event - ); - } - - /// Voice over IP. - pub mod voip { - endpoint!( - /// Get credentials for initiating voice over IP calls via a TURN server. - [r0, voip], - get_turn_server_info - ); - } -} - -/// Endpoints that cannot change with new versions of the Matrix specification. -pub mod unversioned { - endpoint!( - /// Get the versions of the specification supported by this homeserver. - [unversioned], - get_supported_versions - ); -} diff --git a/src/lib.rs b/src/lib.rs index 0a8e5896..bd148382 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,11 +75,12 @@ //! // Start `work` on a futures runtime... //! ``` +#![feature(async_await, async_closure)] #![deny( missing_copy_implementations, missing_debug_implementations, missing_docs, - warnings + //warnings )] #![warn( clippy::empty_line_after_outer_attr, @@ -101,15 +102,16 @@ )] use std::{ - convert::TryInto, + convert::{TryFrom, TryInto}, str::FromStr, sync::{Arc, Mutex}, }; use futures::{ - future::{Future, FutureFrom, IntoFuture}, - stream::{self, Stream}, + future::Future, + stream::{self, TryStream, TryStreamExt as _}, }; +use http::Response as HttpResponse; use hyper::{ client::{connect::Connect, HttpConnector}, Client as HyperClient, Uri, @@ -125,7 +127,7 @@ use crate::error::InnerError; pub use crate::{error::Error, session::Session}; /// Matrix client-server API endpoints. -pub mod api; +//pub mod api; mod error; mod session; @@ -177,7 +179,7 @@ impl Client> { Ok(Self(Arc::new(ClientData { homeserver_url, - hyper: { HyperClient::builder().keep_alive(true).build(connector) }, + hyper: HyperClient::builder().keep_alive(true).build(connector), session: Mutex::new(session), }))) } @@ -207,50 +209,43 @@ where /// In contrast to api::r0::session::login::call(), this method stores the /// session data returned by the endpoint in this client, instead of /// returning it. - pub fn log_in( + pub async fn log_in( &self, user: String, password: String, device_id: Option, - ) -> impl Future { - use crate::api::r0::session::login; + ) -> Result { + use ruma_client_api::r0::session::login; - let data = self.0.clone(); - - login::call( - self.clone(), - login::Request { + let response = self + .request::(login::Request { address: None, login_type: login::LoginType::Password, medium: None, device_id, password, user, - }, - ) - .map(move |response| { - let session = Session { - access_token: response.access_token, - device_id: response.device_id, - user_id: response.user_id, - }; - *data.session.lock().unwrap() = Some(session.clone()); + }) + .await?; - session - }) + let session = Session { + access_token: response.access_token, + device_id: response.device_id, + user_id: response.user_id, + }; + *self.0.session.lock().unwrap() = Some(session.clone()); + + Ok(session) } /// Register as a guest. In contrast to api::r0::account::register::call(), /// this method stores the session data returned by the endpoint in this /// client, instead of returning it. - pub fn register_guest(&self) -> impl Future { - use crate::api::r0::account::register; + pub async fn register_guest(&self) -> Result { + use ruma_client_api::r0::account::register; - let data = self.0.clone(); - - register::call( - self.clone(), - register::Request { + let response = self + .request::(register::Request { auth: None, bind_email: None, device_id: None, @@ -258,18 +253,17 @@ where kind: Some(register::RegistrationKind::Guest), password: None, username: None, - }, - ) - .map(move |response| { - let session = Session { - access_token: response.access_token, - device_id: response.device_id, - user_id: response.user_id, - }; - *data.session.lock().unwrap() = Some(session.clone()); + }) + .await?; - session - }) + let session = Session { + access_token: response.access_token, + device_id: response.device_id, + user_id: response.user_id, + }; + *self.0.session.lock().unwrap() = Some(session.clone()); + + Ok(session) } /// Register as a new user on this server. @@ -280,18 +274,15 @@ where /// /// The username is the local part of the returned user_id. If it is /// omitted from this request, the server will generate one. - pub fn register_user( + pub async fn register_user( &self, username: Option, password: String, - ) -> impl Future { - use crate::api::r0::account::register; + ) -> Result { + use ruma_client_api::r0::account::register; - let data = self.0.clone(); - - register::call( - self.clone(), - register::Request { + let response = self + .request::(register::Request { auth: None, bind_email: None, device_id: None, @@ -299,18 +290,17 @@ where kind: Some(register::RegistrationKind::User), password: Some(password), username, - }, - ) - .map(move |response| { - let session = Session { - access_token: response.access_token, - device_id: response.device_id, - user_id: response.user_id, - }; - *data.session.lock().unwrap() = Some(session.clone()); + }) + .await?; - session - }) + let session = Session { + access_token: response.access_token, + device_id: response.device_id, + user_id: response.user_id, + }; + *self.0.session.lock().unwrap() = Some(session.clone()); + + Ok(session) } /// Convenience method that represents repeated calls to the sync_events endpoint as a stream. @@ -320,11 +310,19 @@ where /// the logged-in users account and are visible to them. pub fn sync( &self, - filter: Option, + filter: Option, since: Option, set_presence: bool, - ) -> impl Stream { - use crate::api::r0::sync::sync_events; + ) -> impl TryStream { + use ruma_client_api::r0::sync::sync_events; + + // TODO: Is this really the way TryStreams are supposed to work? + #[derive(Debug, PartialEq, Eq)] + enum State { + InitialSync, + Since(String), + Errored, + } let client = self.clone(); let set_presence = if set_presence { @@ -333,71 +331,80 @@ where Some(sync_events::SetPresence::Offline) }; - stream::unfold(since, move |since| { - Some( - sync_events::call( - client.clone(), - sync_events::Request { - filter: filter.clone(), + let initial_state = match since { + Some(s) => State::Since(s), + None => State::InitialSync, + }; + + stream::unfold(initial_state, move |state| { + let client = client.clone(); + let filter = filter.clone(); + + async move { + let since = match state { + State::Errored => return None, + State::Since(s) => Some(s), + State::InitialSync => None, + }; + + let res = client + .request::(sync_events::Request { + filter, since, full_state: None, set_presence: set_presence.clone(), timeout: None, - }, - ) - .map(|res| { - let next_batch_clone = res.next_batch.clone(); - (res, Some(next_batch_clone)) - }), - ) + }) + .await; + + match res { + Ok(response) => { + let next_batch_clone = response.next_batch.clone(); + Some((Ok(response), State::Since(next_batch_clone))) + } + Err(e) => Some((Err(e.into()), State::Errored)), + } + } }) } /// Makes a request to a Matrix API endpoint. - pub(crate) fn request( - self, - request: ::Request, - ) -> impl Future - where - E: Endpoint, - { - let data1 = self.0.clone(); - let data2 = self.0.clone(); - let mut url = self.0.homeserver_url.clone(); + pub fn request( + &self, + request: E::Request, + ) -> impl Future> { + let client = self.0.clone(); - request - .try_into() - .map_err(Error::from) - .into_future() - .and_then(move |hyper_request| { - { - let uri = hyper_request.uri(); + async move { + let mut url = client.homeserver_url.clone(); - url.set_path(uri.path()); - url.set_query(uri.query()); + let mut hyper_request = request.try_into()?.map(hyper::Body::from); - if E::METADATA.requires_authentication { - if let Some(ref session) = *data1.session.lock().unwrap() { - url.query_pairs_mut() - .append_pair("access_token", &session.access_token); - } else { - return Err(Error(InnerError::AuthenticationRequired)); - } + { + let uri = hyper_request.uri(); + + url.set_path(uri.path()); + url.set_query(uri.query()); + + if E::METADATA.requires_authentication { + if let Some(ref session) = *client.session.lock().unwrap() { + url.query_pairs_mut() + .append_pair("access_token", &session.access_token); + } else { + return Err(Error(InnerError::AuthenticationRequired)); } } + } - Uri::from_str(url.as_ref()) - .map(move |uri| (uri, hyper_request)) - .map_err(Error::from) - }) - .and_then(move |(uri, mut hyper_request)| { - *hyper_request.uri_mut() = uri; + *hyper_request.uri_mut() = Uri::from_str(url.as_ref())?; - data2.hyper.request(hyper_request).map_err(Error::from) - }) - .and_then(|hyper_response| { - E::Response::future_from(hyper_response).map_err(Error::from) - }) + let hyper_response = client.hyper.request(hyper_request).await?; + let (head, body) = hyper_response.into_parts(); + let full_response = + HttpResponse::from_parts(head, body.try_concat().await?.as_ref().to_owned()); + + Ok(E::Response::try_from(full_response)?) + } } }