use std::{ sync::{Arc, Mutex}, time::Duration, }; use assign::assign; use async_stream::try_stream; use futures_core::stream::Stream; use ruma_client_api::{ account::register::{self, RegistrationKind}, session::login::{self, v3::LoginInfo}, sync::sync_events, uiaa::UserIdentifier, }; use ruma_common::{ api::{MatrixVersion, OutgoingRequest, SendAccessToken}, presence::PresenceState, DeviceId, UserId, }; use crate::{ add_user_id_to_query, send_customized_request, Error, HttpClient, ResponseError, ResponseResult, }; mod builder; pub use self::builder::ClientBuilder; /// A client for the Matrix client-server API. #[derive(Clone, Debug)] pub struct Client(Arc>); /// Data contained in Client's Rc #[derive(Debug)] struct ClientData { /// The URL of the homeserver to connect to. homeserver_url: String, /// The underlying HTTP client. http_client: C, /// The access token, if logged in. access_token: Mutex>, /// The (known) Matrix versions the homeserver supports. supported_matrix_versions: Vec, } impl Client<()> { /// Creates a new client builder. pub fn builder() -> ClientBuilder { ClientBuilder::new() } } impl Client { /// Get a copy of the current `access_token`, if any. /// /// Useful for serializing and persisting the session to be restored later. pub fn access_token(&self) -> Option { self.0.access_token.lock().expect("session mutex was poisoned").clone() } } impl Client { /// Makes a request to a Matrix API endpoint. pub async fn send_request(&self, request: R) -> ResponseResult { self.send_customized_request(request, |_| Ok(())).await } /// Makes a request to a Matrix API endpoint including additional URL parameters. pub async fn send_customized_request( &self, request: R, customize: F, ) -> ResponseResult where R: OutgoingRequest, F: FnOnce(&mut http::Request) -> Result<(), ResponseError>, { let access_token = self.access_token(); let send_access_token = match access_token.as_deref() { Some(at) => SendAccessToken::IfRequired(at), None => SendAccessToken::None, }; send_customized_request( &self.0.http_client, &self.0.homeserver_url, send_access_token, &self.0.supported_matrix_versions, request, customize, ) .await } /// Makes a request to a Matrix API endpoint as a virtual user. /// /// This method is meant to be used by application services when interacting with the /// client-server API. pub async fn send_request_as( &self, user_id: &UserId, request: R, ) -> ResponseResult { self.send_customized_request(request, add_user_id_to_query::(user_id)).await } /// Log in with a username and password. /// /// In contrast to [`send_request`][Self::send_request], this method stores the access token /// returned by the endpoint in this client, in addition to returning it. pub async fn log_in( &self, user: &str, password: &str, device_id: Option<&DeviceId>, initial_device_display_name: Option<&str>, ) -> Result> { let response = self .send_request(assign!(login::v3::Request::new( LoginInfo::Password(login::v3::Password::new(UserIdentifier::UserIdOrLocalpart(user), password))), { device_id, initial_device_display_name, } )) .await?; *self.0.access_token.lock().unwrap() = Some(response.access_token.clone()); Ok(response) } /// Register as a guest. /// /// In contrast to [`send_request`][Self::send_request], this method stores the access token /// returned by the endpoint in this client, in addition to returning it. pub async fn register_guest( &self, ) -> Result> { let response = self .send_request(assign!(register::v3::Request::new(), { kind: RegistrationKind::Guest })) .await?; *self.0.access_token.lock().unwrap() = response.access_token.clone(); Ok(response) } /// Register as a new user on this server. /// /// In contrast to [`send_request`][Self::send_request], this method stores the access token /// returned by the endpoint in this client, in addition to returning it. /// /// The username is the local part of the returned user_id. If it is omitted from this request, /// the server will generate one. pub async fn register_user( &self, username: Option<&str>, password: &str, ) -> Result> { let response = self .send_request(assign!(register::v3::Request::new(), { username, password: Some(password) })) .await?; *self.0.access_token.lock().unwrap() = response.access_token.clone(); Ok(response) } /// Convenience method that represents repeated calls to the sync_events endpoint as a stream. /// /// # Example: /// /// ```no_run /// use std::time::Duration; /// /// # use ruma_common::presence::PresenceState; /// # use tokio_stream::{StreamExt as _}; /// # let homeserver_url = "https://example.com".parse().unwrap(); /// # async { /// # let client = ruma_client::Client::builder() /// # .homeserver_url(homeserver_url) /// # .build::() /// # .await?; /// # let next_batch_token = String::new(); /// let mut sync_stream = Box::pin(client.sync( /// None, /// next_batch_token, /// &PresenceState::Online, /// Some(Duration::from_secs(30)), /// )); /// while let Some(response) = sync_stream.try_next().await? { /// // Do something with the data in the response... /// } /// # Result::<(), ruma_client::Error<_, _>>::Ok(()) /// # }; /// ``` pub fn sync<'a>( &'a self, filter: Option<&'a sync_events::v3::Filter<'a>>, mut since: String, set_presence: &'a PresenceState, timeout: Option, ) -> impl Stream>> + 'a { try_stream! { loop { let response = self .send_request(assign!(sync_events::v3::Request::new(), { filter, since: Some(&since), set_presence, timeout, })) .await?; since = response.next_batch.clone(); yield response; } } } }