client: Add an HttpClient abstraction and implement it for hyper::Client

This commit is contained in:
Jonas Platte 2021-04-27 00:19:02 +02:00
parent 89b191c143
commit c692d18797
No known key found for this signature in database
GPG Key ID: CC154DE0E30B7C67
11 changed files with 173 additions and 147 deletions

View File

@ -24,7 +24,7 @@ jobs:
RUSTDOCFLAGS: "--enable-index-page -Zunstable-options --cfg docsrs"
with:
command: doc
args: --no-deps --workspace --exclude xtask --features full,compat,unstable-pre-spec -Zrustdoc-map
args: --no-deps --workspace --exclude xtask --all-features -Zrustdoc-map
- name: Deploy to docs branch
uses: JamesIves/github-pages-deploy-action@4.1.0
with:

View File

@ -1,6 +1,3 @@
{
"rust-analyzer.cargo.features": [
"full",
"unstable-pre-spec"
]
"rust-analyzer.cargo.allFeatures": true
}

View File

@ -15,36 +15,22 @@ repository = "https://github.com/ruma/ruma"
version = "0.5.0-alpha.2"
[features]
default = ["http1", "http2", "tls-native"]
client-api = ["ruma-client-api"]
http1 = ["hyper/http1"]
http2 = ["hyper/http2"]
tls-native = ["hyper-tls", "_tls"]
tls-rustls-native-roots = [
"hyper-rustls",
"hyper-rustls/native-tokio",
"_tls-rustls",
]
tls-rustls-webpki-roots = [
"hyper-rustls",
"hyper-rustls/webpki-tokio",
"_tls-rustls",
]
# Internal, not meant to be used directly
_tls = []
_tls-rustls = ["_tls"]
# HTTP clients
hyper-native-tls = ["hyper", "hyper-tls"]
hyper-rustls = ["hyper", "hyper-rustls-crate"]
[dependencies]
assign = "1.1.1"
async-stream = "0.3.0"
async-trait = "0.1.50"
bytes = "1.0.1"
futures-core = "0.3.8"
http = "0.2.2"
hyper = { version = "0.14.2", features = ["client", "tcp"] }
hyper = { version = "0.14.2", optional = true, features = ["client", "http1", "http2", "tcp"] }
hyper-tls = { version = "0.5.0", optional = true }
hyper-rustls = { version = "0.22.1", optional = true, default-features = false }
hyper-rustls-crate = { package = "hyper-rustls", version = "0.22.1", optional = true, default-features = false }
ruma-api = { version = "=0.17.0-alpha.4", path = "../ruma-api" }
ruma-client-api = { version = "=0.10.0-alpha.3", path = "../ruma-client-api", optional = true, features = ["client"] }
ruma-common = { version = "0.5.0", path = "../ruma-common" }
@ -62,8 +48,8 @@ tokio-stream = { version = "0.1.1", default-features = false }
[[example]]
name = "hello_world"
required-features = ["client-api"]
required-features = ["client-api", "hyper-native-tls"]
[[example]]
name = "message_log"
required-features = ["client-api"]
required-features = ["client-api", "hyper-native-tls"]

View File

@ -6,7 +6,8 @@ use ruma::{
events::{room::message::MessageEventContent, AnyMessageEventContent},
RoomAliasId,
};
use ruma_client::Client;
type MatrixClient = ruma_client::Client<ruma_client::http_client::HyperNativeTls>;
async fn hello_world(
homeserver_url: Uri,
@ -14,7 +15,7 @@ async fn hello_world(
password: &str,
room_alias: &RoomAliasId,
) -> anyhow::Result<()> {
let client = Client::new(homeserver_url, None);
let client = MatrixClient::new(homeserver_url, None);
client.log_in(username, password, None, Some("ruma-example-client")).await?;
let room_id = client.request(get_alias::Request::new(room_alias)).await?.room_id;

View File

@ -10,11 +10,12 @@ use ruma::{
},
presence::PresenceState,
};
use ruma_client::Client;
use tokio_stream::StreamExt as _;
type MatrixClient = ruma_client::Client<ruma_client::http_client::HyperNativeTls>;
async fn log_messages(homeserver_url: Uri, username: &str, password: &str) -> anyhow::Result<()> {
let client = Client::new(homeserver_url, None);
let client = MatrixClient::new(homeserver_url, None);
client.log_in(username, password, None, None).await?;

View File

@ -11,10 +11,10 @@ use ruma_client_api::r0::{
use ruma_common::presence::PresenceState;
use ruma_identifiers::DeviceId;
use super::{Client, Error};
use super::{Client, Error, HttpClient};
/// Client-API specific functionality of `Client`.
impl Client<super::HyperClient<super::Connector>> {
impl<C: HttpClient> Client<C> {
/// Log in with a username and password.
///
/// In contrast to [`request`], this method stores the access token returned by the endpoint in
@ -25,7 +25,7 @@ impl Client<super::HyperClient<super::Connector>> {
password: &str,
device_id: Option<&DeviceId>,
initial_device_display_name: Option<&str>,
) -> Result<login::Response, Error<ruma_client_api::Error>> {
) -> Result<login::Response, Error<C::Error, ruma_client_api::Error>> {
let response = self
.request(assign!(
login::Request::new(
@ -48,7 +48,7 @@ impl Client<super::HyperClient<super::Connector>> {
/// this client, in addition to returning it.
pub async fn register_guest(
&self,
) -> Result<register::Response, Error<ruma_client_api::r0::uiaa::UiaaResponse>> {
) -> Result<register::Response, Error<C::Error, ruma_client_api::r0::uiaa::UiaaResponse>> {
let response = self
.request(assign!(register::Request::new(), { kind: RegistrationKind::Guest }))
.await?;
@ -69,7 +69,7 @@ impl Client<super::HyperClient<super::Connector>> {
&self,
username: Option<&str>,
password: &str,
) -> Result<register::Response, Error<ruma_client_api::r0::uiaa::UiaaResponse>> {
) -> Result<register::Response, Error<C::Error, ruma_client_api::r0::uiaa::UiaaResponse>> {
let response = self
.request(assign!(register::Request::new(), { username, password: Some(password) }))
.await?;
@ -81,16 +81,16 @@ impl Client<super::HyperClient<super::Connector>> {
/// Convenience method that represents repeated calls to the sync_events endpoint as a stream.
pub fn sync<'a>(
&self,
&'a self,
filter: Option<&'a sync_events::Filter<'a>>,
mut since: String,
set_presence: &'a PresenceState,
timeout: Option<Duration>,
) -> impl Stream<Item = Result<sync_events::Response, Error<ruma_client_api::Error>>> + 'a {
let client = self.clone();
) -> impl Stream<Item = Result<sync_events::Response, Error<C::Error, ruma_client_api::Error>>> + 'a
{
try_stream! {
loop {
let response = client
let response = self
.request(assign!(sync_events::Request::new(), {
filter,
since: Some(&since),

View File

@ -7,7 +7,7 @@ use ruma_api::error::{FromHttpResponseError, IntoHttpError};
/// An error that can occur during client operations.
#[derive(Debug)]
#[cfg_attr(not(feature = "unstable-exhaustive-types"), non_exhaustive)]
pub enum Error<E> {
pub enum Error<E, F> {
/// Queried endpoint requires authentication but was called on an anonymous client.
AuthenticationRequired,
@ -15,66 +15,53 @@ pub enum Error<E> {
IntoHttp(IntoHttpError),
/// The request's URL is invalid (this should never happen).
Url(UrlError),
Url(http::Error),
/// Couldn't obtain an HTTP response (e.g. due to network or DNS issues).
Response(ResponseError),
Response(E),
/// Converting the HTTP response to one of ruma's types failed.
FromHttpResponse(FromHttpResponseError<E>),
FromHttpResponse(FromHttpResponseError<F>),
}
impl<E: Display> Display for Error<E> {
impl<E: Display, F: Display> Display for Error<E, F> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::AuthenticationRequired => {
write!(f, "The queried endpoint requires authentication but was called with an anonymous client.")
}
Self::IntoHttp(err) => write!(f, "HTTP request construction failed: {}", err),
Self::Url(UrlError(err)) => write!(f, "Invalid URL: {}", err),
Self::Response(ResponseError(err)) => write!(f, "Couldn't obtain a response: {}", err),
Self::Url(err) => write!(f, "Invalid URL: {}", err),
Self::Response(err) => write!(f, "Couldn't obtain a response: {}", err),
Self::FromHttpResponse(err) => write!(f, "HTTP response conversion failed: {}", err),
}
}
}
impl<E> From<IntoHttpError> for Error<E> {
impl<E, F> From<IntoHttpError> for Error<E, F> {
fn from(err: IntoHttpError) -> Self {
Error::IntoHttp(err)
}
}
#[doc(hidden)]
impl<E> From<http::uri::InvalidUri> for Error<E> {
impl<E, F> From<http::uri::InvalidUri> for Error<E, F> {
fn from(err: http::uri::InvalidUri) -> Self {
Error::Url(UrlError(err.into()))
Error::Url(err.into())
}
}
#[doc(hidden)]
impl<E> From<http::uri::InvalidUriParts> for Error<E> {
impl<E, F> From<http::uri::InvalidUriParts> for Error<E, F> {
fn from(err: http::uri::InvalidUriParts) -> Self {
Error::Url(UrlError(err.into()))
Error::Url(err.into())
}
}
#[doc(hidden)]
impl<E> From<hyper::Error> for Error<E> {
fn from(err: hyper::Error) -> Self {
Error::Response(ResponseError(err))
}
}
impl<E> From<FromHttpResponseError<E>> for Error<E> {
fn from(err: FromHttpResponseError<E>) -> Self {
impl<E, F> From<FromHttpResponseError<F>> for Error<E, F> {
fn from(err: FromHttpResponseError<F>) -> Self {
Error::FromHttpResponse(err)
}
}
impl<E: Debug + Display> std::error::Error for Error<E> {}
#[derive(Debug)]
pub struct UrlError(http::Error);
#[derive(Debug)]
pub struct ResponseError(hyper::Error);
impl<E: Debug + Display, F: Debug + Display> std::error::Error for Error<E, F> {}

View File

@ -0,0 +1,40 @@
//! This module contains an abstraction for HTTP clients as well as friendly-named re-exports of
//! client types that implement this trait.
use async_trait::async_trait;
use bytes::BufMut;
#[cfg(feature = "hyper")]
mod hyper;
#[cfg(feature = "hyper")]
pub use self::hyper::Hyper;
#[cfg(feature = "hyper-native-tls")]
pub use self::hyper::HyperNativeTls;
#[cfg(feature = "hyper-rustls")]
pub use self::hyper::HyperRustls;
/// An HTTP client that can be used to send requests to a Matrix homeserver.
#[async_trait]
pub trait HttpClient {
/// The type to use for `try_into_http_request`.
type RequestBody: Default + BufMut;
/// The type to use for `try_from_http_response`.
type ResponseBody: AsRef<[u8]>;
/// The error type for the `send_request` function.
type Error: Unpin;
/// Send an `http::Request` to get back an `http::Response`.
async fn send_http_request(
&self,
req: http::Request<Self::RequestBody>,
) -> Result<http::Response<Self::ResponseBody>, Self::Error>;
}
/// An HTTP client that has a default configuration.
pub trait DefaultConstructibleHttpClient: HttpClient {
/// Creates a new HTTP client with default configuration.
fn default() -> Self;
}

View File

@ -0,0 +1,60 @@
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use hyper::client::{connect::Connect, HttpConnector};
use super::{DefaultConstructibleHttpClient, HttpClient};
/// A basic hyper HTTP client.
///
/// You basically never want this, since it doesn't support `https`.
pub type Hyper = hyper::Client<HttpConnector>;
/// A hyper HTTP client using native-tls for TLS support.
#[cfg(feature = "hyper-native-tls")]
pub type HyperNativeTls = hyper::Client<hyper_tls::HttpsConnector<HttpConnector>>;
/// A hyper HTTP client using rustls for TLS support.
///
/// This client does not implement `DefaultConstructibleHttpClient`. To use it, you need to manually
/// construct
#[cfg(feature = "hyper-rustls")]
pub type HyperRustls = hyper::Client<hyper_rustls::HttpsConnector<HttpConnector>>;
#[async_trait]
impl<C> HttpClient for hyper::Client<C>
where
C: Connect + Clone + Send + Sync + 'static,
{
type RequestBody = BytesMut;
type ResponseBody = Bytes;
type Error = hyper::Error;
async fn send_http_request(
&self,
req: http::Request<BytesMut>,
) -> Result<http::Response<Bytes>, hyper::Error> {
let (head, body) = self
.request(req.map(|body| hyper::body::Body::from(body.freeze())))
.await?
.into_parts();
// FIXME: Use aggregate instead of to_bytes once serde_json can parse from a reader at a
// comparable speed as reading from a slice: https://github.com/serde-rs/json/issues/160
let body = hyper::body::to_bytes(body).await?;
Ok(http::Response::from_parts(head, body))
}
}
#[cfg(feature = "hyper")]
impl DefaultConstructibleHttpClient for Hyper {
fn default() -> Self {
hyper::Client::new()
}
}
#[cfg(feature = "hyper-native-tls")]
impl DefaultConstructibleHttpClient for HyperNativeTls {
fn default() -> Self {
hyper::Client::builder().build(hyper_tls::HttpsConnector::new())
}
}

View File

@ -105,41 +105,24 @@ use std::{
};
use assign::assign;
use http::{uri::Uri, Response as HttpResponse};
use hyper::client::{Client as HyperClient, HttpConnector};
use http::uri::Uri;
use ruma_api::{AuthScheme, OutgoingRequest, SendAccessToken};
use ruma_serde::urlencoded;
// "Undo" rename from `Cargo.toml` that only serves to make `hyper-rustls` available as a Cargo
// feature name.
#[cfg(feature = "hyper-rustls")]
extern crate hyper_rustls_crate as hyper_rustls;
#[cfg(feature = "client-api")]
mod client_api;
mod error;
pub mod http_client;
pub use self::error::Error;
#[cfg(not(feature = "_tls"))]
type Connector = HttpConnector;
#[cfg(feature = "tls-native")]
type Connector = hyper_tls::HttpsConnector<HttpConnector>;
#[cfg(feature = "_tls-rustls")]
type Connector = hyper_rustls::HttpsConnector<HttpConnector>;
fn create_connector() -> Connector {
#[cfg(not(feature = "_tls"))]
let connector = HttpConnector::new();
#[cfg(feature = "tls-native")]
let connector = hyper_tls::HttpsConnector::new();
#[cfg(feature = "tls-rustls-native-roots")]
let connector = hyper_rustls::HttpsConnector::with_native_roots();
#[cfg(feature = "tls-rustls-webpki-roots")]
let connector = hyper_rustls::HttpsConnector::with_webpki_roots();
connector
}
pub use self::{
error::Error,
http_client::{DefaultConstructibleHttpClient, HttpClient},
};
/// A client for the Matrix client-server API.
#[derive(Clone, Debug)]
@ -173,17 +156,6 @@ impl<C> Client<C> {
access_token: Mutex::new(access_token),
}))
}
}
impl Client<HyperClient<Connector>> {
/// Creates a new client based on a default-constructed hyper HTTP client.
pub fn new(homeserver_url: Uri, access_token: Option<String>) -> Self {
Self(Arc::new(ClientData {
homeserver_url,
http_client: HyperClient::builder().build(create_connector()),
access_token: Mutex::new(access_token),
}))
}
/// Get a copy of the current `access_token`, if any.
///
@ -191,12 +163,25 @@ impl Client<HyperClient<Connector>> {
pub fn access_token(&self) -> Option<String> {
self.0.access_token.lock().expect("session mutex was poisoned").clone()
}
}
impl<C: DefaultConstructibleHttpClient> Client<C> {
/// Creates a new client based on a default-constructed hyper HTTP client.
pub fn new(homeserver_url: Uri, access_token: Option<String>) -> Self {
Self(Arc::new(ClientData {
homeserver_url,
http_client: DefaultConstructibleHttpClient::default(),
access_token: Mutex::new(access_token),
}))
}
}
impl<C: HttpClient> Client<C> {
/// Makes a request to a Matrix API endpoint.
pub async fn request<Request: OutgoingRequest>(
&self,
request: Request,
) -> Result<Request::IncomingResponse, Error<Request::EndpointError>> {
) -> Result<Request::IncomingResponse, Error<C::Error, Request::EndpointError>> {
self.request_with_url_params(request, None).await
}
@ -205,7 +190,7 @@ impl Client<HyperClient<Connector>> {
&self,
request: Request,
extra_params: Option<BTreeMap<String, String>>,
) -> Result<Request::IncomingResponse, Error<Request::EndpointError>> {
) -> Result<Request::IncomingResponse, Error<C::Error, Request::EndpointError>> {
let client = self.0.clone();
let mut http_request = {
let lock;
@ -220,10 +205,7 @@ impl Client<HyperClient<Connector>> {
SendAccessToken::None
};
request.try_into_http_request::<Vec<u8>>(
&client.homeserver_url.to_string(),
access_token,
)?
request.try_into_http_request(&client.homeserver_url.to_string(), access_token)?
};
let extra_params = urlencoded::to_string(extra_params).unwrap();
@ -236,15 +218,8 @@ impl Client<HyperClient<Connector>> {
path_and_query: Some(new_path_and_query.parse()?),
}))?;
let hyper_response =
client.http_client.request(http_request.map(hyper::Body::from)).await?;
let (head, body) = hyper_response.into_parts();
// FIXME: Use aggregate instead of to_bytes once serde_json can parse from a reader at a
// comparable speed as reading from a slice: https://github.com/serde-rs/json/issues/160
let full_body = hyper::body::to_bytes(body).await?;
let full_response = HttpResponse::from_parts(head, full_body);
Ok(ruma_api::IncomingResponse::try_from_http_response(full_response)?)
let http_response =
client.http_client.send_http_request(http_request).await.map_err(Error::Response)?;
Ok(ruma_api::IncomingResponse::try_from_http_response(http_response)?)
}
}

View File

@ -61,29 +61,8 @@ impl CiTask {
{
let _p = pushd("ruma-client")?;
r.push(
cmd!(
"rustup run stable cargo check
--no-default-features --features http1,http2 --quiet"
)
.run(),
);
r.push(
cmd!(
"rustup run stable cargo check
--no-default-features --features http1,http2,tls-rustls-native-roots
--quiet"
)
.run(),
);
r.push(
cmd!(
"rustup run stable cargo check
--no-default-features --features http1,http2,tls-rustls-webpki-roots
--quiet"
)
.run(),
);
r.push(cmd!("rustup run stable cargo check --no-default-features --quiet").run());
r.push(cmd!("rustup run stable cargo check --all-features --quiet").run());
}
r.into_iter().collect()