Use Hyper's Tokio branch.

This commit is contained in:
Jimmy Cuadra 2017-01-07 00:11:19 -08:00
parent ba1a73e363
commit 43d21222dd
3 changed files with 123 additions and 42 deletions

View File

@ -11,11 +11,16 @@ repository = "https://github.com/ruma/ruma-client"
version = "0.1.0" version = "0.1.0"
[dependencies] [dependencies]
hyper = "0.9.14" futures = "0.1.7"
ruma-identifiers = "0.6.0" ruma-identifiers = "0.6.0"
serde = "0.8.21" serde = "0.8.21"
serde_json = "0.8.4" serde_json = "0.8.4"
tokio-core = "0.1.3"
url = "1.2.4" url = "1.2.4"
[dependencies.hyper]
branch = "tokio"
git = "https://github.com/hyperium/hyper"
[dependencies.ruma-client-api] [dependencies.ruma-client-api]
git = "https://github.com/ruma/ruma-client-api" git = "https://github.com/ruma/ruma-client-api"

View File

@ -1,27 +1,27 @@
//! Crate ruma_client is a [Matrix](https://matrix.org/) client library. //! Crate ruma_client is a [Matrix](https://matrix.org/) client library.
#![feature(try_from)]
#![deny(missing_debug_implementations)] #![deny(missing_debug_implementations)]
#![deny(missing_docs)] #![deny(missing_docs)]
extern crate futures;
extern crate hyper; extern crate hyper;
extern crate ruma_client_api; extern crate ruma_client_api;
extern crate ruma_identifiers; extern crate ruma_identifiers;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate tokio_core;
extern crate url; extern crate url;
use std::convert::TryInto; use hyper::client::{Client as HyperClient, DefaultConnector, Request as HyperRequest};
use hyper::Method as HyperMethod;
use hyper::client::{Client as Hyper, IntoUrl};
use hyper::method::Method as HyperMethod;
use ruma_client_api::{Endpoint, Method}; use ruma_client_api::{Endpoint, Method};
use ruma_client_api::unversioned::get_supported_versions; use ruma_client_api::unversioned::get_supported_versions;
use tokio_core::reactor::Handle;
use url::Url; use url::Url;
pub use error::Error; pub use error::Error;
pub use session::Session; pub use session::Session;
pub use response::Response; pub use response::{FutureResponse, Response};
mod error; mod error;
mod response; mod response;
@ -31,7 +31,7 @@ mod session;
#[derive(Debug)] #[derive(Debug)]
pub struct Client { pub struct Client {
homeserver_url: Url, homeserver_url: Url,
hyper: Hyper, hyper: HyperClient<DefaultConnector>,
session: Option<Session>, session: Option<Session>,
} }
@ -55,23 +55,50 @@ impl Client {
/// ///
/// # Errors /// # Errors
/// ///
/// Returns an error if the given homserver URL cannot be parsed as a URL. /// Returns an error if the given homeserver URL cannot be parsed as a URL.
pub fn new<U>(homeserver_url: U) -> Result<Self, Error> where U: IntoUrl { pub fn new<U>(handle: &Handle, homeserver_url: U) -> Result<Self, Error> where U: TryIntoUrl {
Ok(Client { Ok(Client {
homeserver_url: homeserver_url.into_url()?, homeserver_url: homeserver_url.try_into()?,
hyper: Hyper::new(), hyper: HyperClient::configure().keep_alive(true).build(handle),
session: None, session: None,
}) })
} }
/// Get the versions of the Matrix client-server specification supported by the homeserver. /// Get the versions of the Matrix client-server specification supported by the homeserver.
pub fn get_supported_versions(&self) pub fn get_supported_versions(&mut self)
-> Result<Response<<get_supported_versions::Endpoint as Endpoint>::Response>, Error> { -> FutureResponse<<get_supported_versions::Endpoint as Endpoint>::Response> {
let response = self.hyper.request( let request = HyperRequest::new(
get_supported_versions::Endpoint::method().into_hyper(), get_supported_versions::Endpoint::method().into_hyper(),
self.homeserver_url.join(&get_supported_versions::Endpoint::request_path(()))?, self.homeserver_url.join(
).send()?; &get_supported_versions::Endpoint::request_path(())
).expect("request path should be joinable").try_into().expect("url should be parsable"),
);
Ok(response.try_into()?) FutureResponse::from(self.hyper.request(request))
}
}
/// Functionally equivalent to `TryInto<Url>`, and should be replaced by that as soon as it's
/// stable and available.
pub trait TryIntoUrl {
/// Performs the conversion.
fn try_into(self) -> Result<Url, Error>;
}
impl TryIntoUrl for String {
fn try_into(self) -> Result<Url, Error> {
Url::parse(&self).map_err(Error::from)
}
}
impl<'a> TryIntoUrl for &'a str {
fn try_into(self) -> Result<Url, Error> {
Url::parse(self).map_err(Error::from)
}
}
impl TryIntoUrl for Url {
fn try_into(self) -> Result<Url, Error> {
Ok(self)
} }
} }

View File

@ -1,41 +1,90 @@
use std::convert::TryFrom;
use std::fmt::Debug; use std::fmt::Debug;
use std::io::Write;
use std::marker::PhantomData;
use hyper::client::Response as HyperResponse; use futures::{Async, Future, Poll, Stream};
use hyper::client::{FutureResponse as HyperFutureResponse, Response as HyperResponse};
use hyper::{Error as HyperError};
use hyper::header::Headers; use hyper::header::Headers;
use hyper::status::StatusCode; use hyper::status::StatusCode;
use hyper::version::HttpVersion; use hyper::HttpVersion;
use serde::Deserialize; use serde::Deserialize;
use serde_json::from_reader; use serde_json::from_slice;
use url::Url;
use Error; use Error;
/// A `Future` that will resolve into a `Response`.
#[derive(Debug)]
pub struct FutureResponse<T> where T: Debug + Deserialize + Send + 'static {
hyper_future_response: HyperFutureResponse,
phantom: PhantomData<T>,
}
/// A response from a Matrix homeserver. /// A response from a Matrix homeserver.
#[derive(Debug)] #[derive(Debug)]
pub struct Response<T> where T: Debug + Deserialize { pub struct Response<T> where T: Debug + Deserialize {
/// The Hyper response.
hyper_response: HyperResponse,
/// The response from the Matrix API. /// The response from the Matrix API.
pub data: T, phantom: PhantomData<T>,
}
impl<T> Future for FutureResponse<T> where T: Debug + Deserialize + Send + 'static {
type Item = Response<T>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.hyper_future_response.poll() {
Ok(Async::Ready(hyper_response)) => Ok(Async::Ready(Response {
hyper_response: hyper_response,
phantom: PhantomData,
})),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(error) => Err(Error::from(error)),
}
}
}
impl<T> From<HyperFutureResponse> for FutureResponse<T>
where T: Debug + Deserialize + Send + 'static {
fn from(hyper_future_response: HyperFutureResponse) -> FutureResponse<T> {
FutureResponse {
hyper_future_response: hyper_future_response,
phantom: PhantomData,
}
}
}
impl<T> Response<T> where T: Debug + Deserialize + Send + 'static {
/// The response from the Matrix API.
pub fn data(self) -> Box<Future<Item=T, Error=Error>> {
let bytes = self.hyper_response.body().fold(Vec::new(), |mut bytes, chunk| {
if let Err(error) = bytes.write_all(&chunk) {
return Err(HyperError::from(error));
}
Ok(bytes)
}).map_err(Error::from);
let deserialized_data = bytes.and_then(|bytes| {
from_slice(bytes.as_slice()).map_err(Error::from)
});
deserialized_data.boxed()
}
/// The HTTP response code. /// The HTTP response code.
pub status: StatusCode, pub fn status(&self) -> &StatusCode {
self.hyper_response.status()
}
/// The HTTP response headers. /// The HTTP response headers.
pub headers: Headers, pub fn headers(&self) -> &Headers {
self.hyper_response.headers()
}
/// The HTTP version. /// The HTTP version.
pub http_version: HttpVersion, pub fn http_version(&self) -> &HttpVersion {
/// The URL that was requested. self.hyper_response.version()
pub url: Url,
}
impl<T> TryFrom<HyperResponse> for Response<T> where T: Debug + Deserialize {
type Err = Error;
fn try_from(hyper_response: HyperResponse) -> Result<Self, Self::Err> {
Ok(Response {
status: hyper_response.status,
headers: hyper_response.headers.clone(),
http_version: hyper_response.version,
url: hyper_response.url.clone(),
data: from_reader(hyper_response)?,
})
} }
} }