Integrate ruma-client

This commit is contained in:
Jonas Platte 2020-08-11 01:20:02 +02:00
parent 3a6bdd63bd
commit b20768c1b1
No known key found for this signature in database
GPG Key ID: 7D261D771D915378
7 changed files with 149 additions and 148 deletions

View File

@ -29,3 +29,17 @@ ruma_api! {
error: crate::Error
}
impl<'a> Request<'a> {
/// Creates a new `Request` with the given room alias id.
pub fn new(room_alias: &'a RoomAliasId) -> Self {
Self { room_alias }
}
}
impl Response {
/// Creates a new `Response` with the given room id and servers
pub fn new(room_id: RoomId, servers: Vec<String>) -> Self {
Self { room_id, servers }
}
}

View File

@ -18,7 +18,7 @@ ruma_api! {
request: {
/// The room where the user should be invited.
#[ruma_api(path)]
pub room_id: RoomId,
pub room_id: &'a RoomId,
/// The signature of a `m.third_party_invite` token to prove that this user owns a third
/// party identity which has been invited to the room.
@ -33,3 +33,17 @@ ruma_api! {
error: crate::Error
}
impl<'a> Request<'a> {
/// Creates a new `Request` with the given room id.
pub fn new(room_id: &'a RoomId) -> Self {
Self { room_id, third_party_signed: None }
}
}
impl Response {
/// Creates a new `Response` with the given room id.
pub fn new(room_id: RoomId) -> Self {
Self { room_id }
}
}

View File

@ -16,24 +16,26 @@ repository = "https://github.com/ruma/ruma-client"
version = "0.4.0"
[dependencies]
assign = "1.1.0"
futures-core = "0.3.5"
futures-util = "0.3.5"
http = "0.2.1"
hyper = "0.13.5"
hyper-tls = { version = "0.4.1", optional = true }
ruma-api = "=0.17.0-alpha.1"
ruma-client-api = "=0.10.0-alpha.1"
ruma-common = "0.2.0"
ruma-events = "=0.22.0-alpha.1"
ruma-identifiers = "0.17.1"
serde = { version = "1.0.110", features = ["derive"] }
serde_json = "1.0.53"
hyper = "0.13.7"
hyper-tls = { version = "0.4.3", optional = true }
ruma-api = { version = "=0.17.0-alpha.1", path = "../ruma-api" }
ruma-client-api = { version = "0.10.0-alpha.1", path = "../ruma-client-api" }
ruma-common = { version = "0.2.0", path = "../ruma-common" }
ruma-events = { version = "=0.22.0-alpha.1", path = "../ruma-events" }
ruma-identifiers = { version = "0.17.4", path = "../ruma-identifiers" }
ruma-serde = { version = "0.2.3", path = "../ruma-serde" }
serde = { version = "1.0.115", features = ["derive"] }
serde_json = "1.0.57"
serde_urlencoded = "0.6.1"
url = "2.1.1"
[dev-dependencies]
anyhow = "1.0.31"
tokio = { version = "0.2.21", features = ["macros"] }
anyhow = "1.0.32"
ruma = { version = "0.0.1", path = "../ruma", features = ["client-api"] }
tokio = { version = "0.2.22", features = ["macros"] }
[features]
default = ["tls"]

View File

@ -1,42 +1,32 @@
use std::{convert::TryFrom, env, process::exit};
use ruma_client::{
self,
api::r0,
use http::Uri;
use ruma::{
api::client::r0::{alias::get_alias, membership::join_room_by_id, message::send_message_event},
events::{
room::message::{MessageEventContent, TextMessageEventContent},
EventType,
},
identifiers::RoomAliasId,
Client,
RoomAliasId,
};
use ruma_client::{self, Client};
use serde_json::value::to_raw_value as to_raw_json_value;
use url::Url;
async fn hello_world(homeserver_url: Url, room: String) -> anyhow::Result<()> {
async fn hello_world(homeserver_url: Uri, room_alias: &RoomAliasId) -> anyhow::Result<()> {
let client = Client::new(homeserver_url, None);
client.register_guest().await?;
let response = client
.request(r0::alias::get_alias::Request {
room_alias: RoomAliasId::try_from(&room[..]).unwrap(),
})
.await?;
let response = client.request(get_alias::Request { room_alias }).await?;
let room_id = response.room_id;
client
.request(r0::membership::join_room_by_id::Request {
room_id: room_id.clone(),
third_party_signed: None,
})
.await?;
client.request(join_room_by_id::Request::new(&room_id)).await?;
client
.request(r0::message::create_message_event::Request {
room_id,
.request(send_message_event::Request {
room_id: &room_id,
event_type: EventType::RoomMessage,
txn_id: "1".to_owned(),
txn_id: "1",
data: to_raw_json_value(&MessageEventContent::Text(TextMessageEventContent {
body: "Hello World!".to_owned(),
formatted: None,
@ -53,13 +43,10 @@ async fn main() -> anyhow::Result<()> {
let (homeserver_url, room) = match (env::args().nth(1), env::args().nth(2)) {
(Some(a), Some(b)) => (a, b),
_ => {
eprintln!(
"Usage: {} <homeserver_url> <room>",
env::args().next().unwrap()
);
eprintln!("Usage: {} <homeserver_url> <room>", env::args().next().unwrap());
exit(1)
}
};
hello_world(homeserver_url.parse().unwrap(), room).await
hello_world(homeserver_url.parse()?, &RoomAliasId::try_from(room.as_str())?).await
}

View File

@ -1,17 +1,18 @@
use std::{env, process::exit, time::Duration};
use futures_util::stream::{StreamExt as _, TryStreamExt as _};
use ruma_client::{
self,
events::room::message::{MessageEventContent, TextMessageEventContent},
HttpClient,
use http::Uri;
use ruma::{
events::{
room::message::{MessageEventContent, TextMessageEventContent},
AnySyncMessageEvent, AnySyncRoomEvent, SyncMessageEvent,
},
presence::PresenceState,
};
use ruma_common::presence::PresenceState;
use ruma_events::{AnySyncMessageEvent, AnySyncRoomEvent, SyncMessageEvent};
use url::Url;
use ruma_client::{self, HttpClient};
async fn log_messages(
homeserver_url: Url,
homeserver_url: Uri,
username: String,
password: String,
) -> anyhow::Result<()> {
@ -21,12 +22,7 @@ async fn log_messages(
let mut sync_stream = Box::pin(
client
.sync(
None,
None,
PresenceState::Online,
Some(Duration::from_secs(30)),
)
.sync(None, None, PresenceState::Online, Some(Duration::from_secs(30)))
// TODO: This is a horrible way to obtain an initial next_batch token that generates way
// too much server load and network traffic. Fix this!
.skip(1),
@ -35,12 +31,7 @@ async fn log_messages(
while let Some(res) = sync_stream.try_next().await? {
// Only look at rooms the user hasn't left yet
for (room_id, room) in res.rooms.join {
for event in room
.timeline
.events
.into_iter()
.flat_map(|r| r.deserialize())
{
for event in room.timeline.events.into_iter().flat_map(|r| r.deserialize()) {
// Filter out the text messages
if let AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomMessage(
SyncMessageEvent {
@ -76,6 +67,6 @@ async fn main() -> anyhow::Result<()> {
}
};
let server = Url::parse(&homeserver_url).unwrap();
let server = homeserver_url.parse()?;
log_messages(server, username, password).await
}

View File

@ -43,7 +43,14 @@ impl<E> From<IntoHttpError> for Error<E> {
#[doc(hidden)]
impl<E> From<http::uri::InvalidUri> for Error<E> {
fn from(err: http::uri::InvalidUri) -> Self {
Error::Url(UrlError(err))
Error::Url(UrlError(err.into()))
}
}
#[doc(hidden)]
impl<E> From<http::uri::InvalidUriParts> for Error<E> {
fn from(err: http::uri::InvalidUriParts) -> Self {
Error::Url(UrlError(err.into()))
}
}
@ -63,7 +70,7 @@ impl<E> From<FromHttpResponseError<E>> for Error<E> {
impl<E: Debug + Display> std::error::Error for Error<E> {}
#[derive(Debug)]
pub struct UrlError(http::uri::InvalidUri);
pub struct UrlError(http::Error);
#[derive(Debug)]
pub struct ResponseError(hyper::Error);

View File

@ -46,7 +46,7 @@
//!
//! # use futures_util::stream::{StreamExt as _, TryStreamExt as _};
//! # use ruma_client::Client;
//! # use ruma_common::presence::PresenceState;
//! # use ruma::presence::PresenceState;
//! # let homeserver_url = "https://example.com".parse().unwrap();
//! # let client = Client::https(homeserver_url, None);
//! # let next_batch_token = String::new();
@ -81,58 +81,53 @@
//! # let client = Client::https(homeserver_url, None);
//! use std::convert::TryFrom;
//!
//! use ruma_client::api::r0::alias::get_alias;
//! use ruma_identifiers::{RoomAliasId, RoomId};
//! use ruma::{
//! api::client::r0::alias::get_alias,
//! room_alias_id, room_id,
//! };
//!
//! async {
//! let response = client
//! .request(get_alias::Request {
//! room_alias: RoomAliasId::try_from("#example_room:example.com").unwrap(),
//! })
//! .request(get_alias::Request::new(&room_alias_id!("#example_room:example.com")))
//! .await?;
//!
//! assert_eq!(response.room_id, RoomId::try_from("!n8f893n9:example.com").unwrap());
//! assert_eq!(response.room_id, room_id!("!n8f893n9:example.com"));
//! # Result::<(), ruma_client::Error<_>>::Ok(())
//! }
//! # ;
//! ```
#![warn(rust_2018_idioms)]
#![deny(
missing_copy_implementations,
missing_debug_implementations,
missing_docs
)]
#![deny(missing_copy_implementations, missing_debug_implementations, missing_docs)]
use std::{
convert::TryFrom,
str::FromStr,
sync::{Arc, Mutex},
time::Duration,
};
use futures_core::{
future::Future,
stream::{Stream, TryStream},
};
use assign::assign;
use futures_core::stream::{Stream, TryStream};
use futures_util::stream;
use http::Response as HttpResponse;
use hyper::{client::HttpConnector, Client as HyperClient, Uri};
use http::{uri::Uri, Response as HttpResponse};
use hyper::{client::HttpConnector, Client as HyperClient};
#[cfg(feature = "hyper-tls")]
use hyper_tls::HttpsConnector;
use ruma_api::Endpoint;
use ruma_client_api::r0::sync::sync_events::{
Filter as SyncFilter, Request as SyncRequest, Response as SyncResponse,
};
use ruma_identifiers::DeviceId;
use ruma_serde::urlencoded;
use std::collections::BTreeMap;
use url::Url;
pub use ruma_client_api as api;
pub use ruma_events as events;
pub use ruma_identifiers as identifiers;
mod error;
mod session;
pub use self::{error::Error, session::Identification, session::Session};
pub use self::{
error::Error,
session::{Identification, Session},
};
/// A client for the Matrix client-server API.
#[derive(Debug)]
@ -142,7 +137,7 @@ pub struct Client<C>(Arc<ClientData<C>>);
#[derive(Debug)]
struct ClientData<C> {
/// The URL of the homeserver to connect to.
homeserver_url: Url,
homeserver_url: Uri,
/// The underlying HTTP client.
hyper: HyperClient<C>,
/// User session data.
@ -154,7 +149,7 @@ pub type HttpClient = Client<HttpConnector>;
impl HttpClient {
/// Creates a new client for making HTTP requests to the given homeserver.
pub fn new(homeserver_url: Url, session: Option<Session>) -> Self {
pub fn new(homeserver_url: Uri, session: Option<Session>) -> Self {
Self(Arc::new(ClientData {
homeserver_url,
hyper: HyperClient::builder().build_http(),
@ -170,7 +165,7 @@ pub type HttpsClient = Client<HttpsConnector<HttpConnector>>;
#[cfg(feature = "tls")]
impl HttpsClient {
/// Creates a new client for making HTTPS requests to the given homeserver.
pub fn https(homeserver_url: Url, session: Option<Session>) -> Self {
pub fn https(homeserver_url: Uri, session: Option<Session>) -> Self {
let connector = HttpsConnector::new();
Self(Arc::new(ClientData {
@ -190,7 +185,7 @@ where
/// This allows the user to configure the details of HTTP as desired.
pub fn custom(
hyper_client: HyperClient<C>,
homeserver_url: Url,
homeserver_url: Uri,
session: Option<Session>,
) -> Self {
Self(Arc::new(ClientData {
@ -204,11 +199,7 @@ where
///
/// Useful for serializing and persisting the session to be restored later.
pub fn session(&self) -> Option<Session> {
self.0
.session
.lock()
.expect("session mutex was poisoned")
.clone()
self.0.session.lock().expect("session mutex was poisoned").clone()
}
/// Log in with a username and password.
@ -222,8 +213,8 @@ where
password: String,
device_id: Option<Box<DeviceId>>,
initial_device_display_name: Option<String>,
) -> Result<Session, Error<api::Error>> {
use api::r0::session::login;
) -> Result<Session, Error<ruma_client_api::Error>> {
use ruma_client_api::r0::session::login;
let response = self
.request(login::Request {
@ -249,8 +240,10 @@ where
/// Register as a guest. In contrast to `api::r0::account::register::call()`,
/// this method stores the session data returned by the endpoint in this
/// client, instead of returning it.
pub async fn register_guest(&self) -> Result<Session, Error<api::r0::uiaa::UiaaResponse>> {
use api::r0::account::register;
pub async fn register_guest(
&self,
) -> Result<Session, Error<ruma_client_api::r0::uiaa::UiaaResponse>> {
use ruma_client_api::r0::account::register;
let response = self
.request(register::Request {
@ -291,8 +284,8 @@ where
&self,
username: Option<String>,
password: String,
) -> Result<Session, Error<api::r0::uiaa::UiaaResponse>> {
use api::r0::account::register;
) -> Result<Session, Error<ruma_client_api::r0::uiaa::UiaaResponse>> {
use ruma_client_api::r0::account::register;
let response = self
.request(register::Request {
@ -328,14 +321,12 @@ where
/// the logged-in users account and are visible to them.
pub fn sync(
&self,
filter: Option<api::r0::sync::sync_events::Filter>,
filter: Option<SyncFilter>,
since: Option<String>,
set_presence: ruma_common::presence::PresenceState,
timeout: Option<Duration>,
) -> impl Stream<Item = Result<api::r0::sync::sync_events::Response, Error<api::Error>>>
+ TryStream<Ok = api::r0::sync::sync_events::Response, Error = Error<api::Error>> {
use api::r0::sync::sync_events;
) -> impl Stream<Item = Result<SyncResponse, Error<ruma_client_api::Error>>>
+ TryStream<Ok = SyncResponse, Error = Error<ruma_client_api::Error>> {
let client = self.clone();
stream::try_unfold(since, move |since| {
let client = client.clone();
@ -343,7 +334,7 @@ where
async move {
let response = client
.request(sync_events::Request {
.request(SyncRequest {
filter,
since,
full_state: false,
@ -359,51 +350,47 @@ where
}
/// Makes a request to a Matrix API endpoint.
pub fn request<Request: Endpoint>(
pub async fn request<Request: Endpoint>(
&self,
request: Request,
) -> impl Future<Output = Result<Request::Response, Error<Request::ResponseError>>> {
self.request_with_url_params(request, None)
) -> Result<Request::IncomingResponse, Error<Request::ResponseError>> {
self.request_with_url_params(request, None).await
}
/// Makes a request to a Matrix API endpoint including additional URL parameters.
pub fn request_with_url_params<Request: Endpoint>(
pub async fn request_with_url_params<Request: Endpoint>(
&self,
request: Request,
params: Option<BTreeMap<String, String>>,
) -> impl Future<Output = Result<Request::Response, Error<Request::ResponseError>>> {
extra_params: Option<BTreeMap<String, String>>,
) -> Result<Request::IncomingResponse, Error<Request::ResponseError>> {
let client = self.0.clone();
let mut url = client.homeserver_url.clone();
async move {
let mut hyper_request = request.try_into()?.map(hyper::Body::from);
{
let uri = hyper_request.uri();
url.set_path(uri.path());
url.set_query(uri.query());
if let Some(params) = params {
for (key, value) in params {
url.query_pairs_mut().append_pair(&key, &value);
}
}
if Request::METADATA.requires_authentication {
if let Some(ref session) = *client.session.lock().unwrap() {
url.query_pairs_mut()
.append_pair("access_token", &session.access_token);
let mut http_request = {
let session;
let access_token = if Request::METADATA.requires_authentication {
session = client.session.lock().unwrap();
if let Some(s) = &*session {
Some(s.access_token.as_str())
} else {
return Err(Error::AuthenticationRequired);
}
}
}
} else {
None
};
*hyper_request.uri_mut() = Uri::from_str(url.as_ref())?;
request.try_into_http_request(&client.homeserver_url.to_string(), access_token)?
};
let hyper_response = client.hyper.request(hyper_request).await?;
let extra_params = urlencoded::to_string(extra_params).unwrap();
let uri = http_request.uri_mut();
let new_path_and_query = match uri.query() {
Some(params) => format!("{}?{}&{}", uri.path(), params, extra_params),
None => format!("{}?{}", uri.path(), extra_params),
};
*uri = Uri::from_parts(assign!(uri.clone().into_parts(), {
path_and_query: Some(new_path_and_query.parse()?),
}))?;
let hyper_response = client.hyper.request(http_request.map(hyper::Body::from)).await?;
let (head, body) = hyper_response.into_parts();
// FIXME: We read the response into a contiguous buffer here (not actually required for
@ -411,8 +398,7 @@ where
let full_body = hyper::body::to_bytes(body).await?;
let full_response = HttpResponse::from_parts(head, full_body.as_ref().to_owned());
Ok(Request::Response::try_from(full_response)?)
}
Ok(Request::IncomingResponse::try_from(full_response)?)
}
}