From 43d21222dd8006d40bc4a24c14a78c94acde72fd Mon Sep 17 00:00:00 2001 From: Jimmy Cuadra Date: Sat, 7 Jan 2017 00:11:19 -0800 Subject: [PATCH] Use Hyper's Tokio branch. --- Cargo.toml | 7 +++- src/lib.rs | 61 ++++++++++++++++++++++--------- src/response.rs | 97 +++++++++++++++++++++++++++++++++++++------------ 3 files changed, 123 insertions(+), 42 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d87f7038..ad20a3a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,11 +11,16 @@ repository = "https://github.com/ruma/ruma-client" version = "0.1.0" [dependencies] -hyper = "0.9.14" +futures = "0.1.7" ruma-identifiers = "0.6.0" serde = "0.8.21" serde_json = "0.8.4" +tokio-core = "0.1.3" url = "1.2.4" +[dependencies.hyper] +branch = "tokio" +git = "https://github.com/hyperium/hyper" + [dependencies.ruma-client-api] git = "https://github.com/ruma/ruma-client-api" diff --git a/src/lib.rs b/src/lib.rs index aef5a4c2..2ebd4038 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,27 +1,27 @@ //! Crate ruma_client is a [Matrix](https://matrix.org/) client library. -#![feature(try_from)] #![deny(missing_debug_implementations)] #![deny(missing_docs)] +extern crate futures; extern crate hyper; extern crate ruma_client_api; extern crate ruma_identifiers; extern crate serde; extern crate serde_json; +extern crate tokio_core; extern crate url; -use std::convert::TryInto; - -use hyper::client::{Client as Hyper, IntoUrl}; -use hyper::method::Method as HyperMethod; +use hyper::client::{Client as HyperClient, DefaultConnector, Request as HyperRequest}; +use hyper::Method as HyperMethod; use ruma_client_api::{Endpoint, Method}; use ruma_client_api::unversioned::get_supported_versions; +use tokio_core::reactor::Handle; use url::Url; pub use error::Error; pub use session::Session; -pub use response::Response; +pub use response::{FutureResponse, Response}; mod error; mod response; @@ -31,7 +31,7 @@ mod session; #[derive(Debug)] pub struct Client { homeserver_url: Url, - hyper: Hyper, + hyper: HyperClient, session: Option, } @@ -55,23 +55,50 @@ impl Client { /// /// # Errors /// - /// Returns an error if the given homserver URL cannot be parsed as a URL. - pub fn new(homeserver_url: U) -> Result where U: IntoUrl { + /// Returns an error if the given homeserver URL cannot be parsed as a URL. + pub fn new(handle: &Handle, homeserver_url: U) -> Result where U: TryIntoUrl { Ok(Client { - homeserver_url: homeserver_url.into_url()?, - hyper: Hyper::new(), + homeserver_url: homeserver_url.try_into()?, + hyper: HyperClient::configure().keep_alive(true).build(handle), session: None, }) } /// Get the versions of the Matrix client-server specification supported by the homeserver. - pub fn get_supported_versions(&self) - -> Result::Response>, Error> { - let response = self.hyper.request( + pub fn get_supported_versions(&mut self) + -> FutureResponse<::Response> { + let request = HyperRequest::new( get_supported_versions::Endpoint::method().into_hyper(), - self.homeserver_url.join(&get_supported_versions::Endpoint::request_path(()))?, - ).send()?; + self.homeserver_url.join( + &get_supported_versions::Endpoint::request_path(()) + ).expect("request path should be joinable").try_into().expect("url should be parsable"), + ); - Ok(response.try_into()?) + FutureResponse::from(self.hyper.request(request)) + } +} + +/// Functionally equivalent to `TryInto`, and should be replaced by that as soon as it's +/// stable and available. +pub trait TryIntoUrl { + /// Performs the conversion. + fn try_into(self) -> Result; +} + +impl TryIntoUrl for String { + fn try_into(self) -> Result { + Url::parse(&self).map_err(Error::from) + } +} + +impl<'a> TryIntoUrl for &'a str { + fn try_into(self) -> Result { + Url::parse(self).map_err(Error::from) + } +} + +impl TryIntoUrl for Url { + fn try_into(self) -> Result { + Ok(self) } } diff --git a/src/response.rs b/src/response.rs index ad50fd08..9ab4f40d 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,41 +1,90 @@ -use std::convert::TryFrom; use std::fmt::Debug; +use std::io::Write; +use std::marker::PhantomData; -use hyper::client::Response as HyperResponse; +use futures::{Async, Future, Poll, Stream}; +use hyper::client::{FutureResponse as HyperFutureResponse, Response as HyperResponse}; +use hyper::{Error as HyperError}; use hyper::header::Headers; use hyper::status::StatusCode; -use hyper::version::HttpVersion; +use hyper::HttpVersion; use serde::Deserialize; -use serde_json::from_reader; -use url::Url; +use serde_json::from_slice; use Error; +/// A `Future` that will resolve into a `Response`. +#[derive(Debug)] +pub struct FutureResponse where T: Debug + Deserialize + Send + 'static { + hyper_future_response: HyperFutureResponse, + phantom: PhantomData, +} + /// A response from a Matrix homeserver. #[derive(Debug)] pub struct Response where T: Debug + Deserialize { + /// The Hyper response. + hyper_response: HyperResponse, /// The response from the Matrix API. - pub data: T, - /// The HTTP response code. - pub status: StatusCode, - /// The HTTP response headers. - pub headers: Headers, - /// The HTTP version. - pub http_version: HttpVersion, - /// The URL that was requested. - pub url: Url, + phantom: PhantomData, } -impl TryFrom for Response where T: Debug + Deserialize { - type Err = Error; +impl Future for FutureResponse where T: Debug + Deserialize + Send + 'static { + type Item = Response; + type Error = Error; - fn try_from(hyper_response: HyperResponse) -> Result { - Ok(Response { - status: hyper_response.status, - headers: hyper_response.headers.clone(), - http_version: hyper_response.version, - url: hyper_response.url.clone(), - data: from_reader(hyper_response)?, - }) + fn poll(&mut self) -> Poll { + match self.hyper_future_response.poll() { + Ok(Async::Ready(hyper_response)) => Ok(Async::Ready(Response { + hyper_response: hyper_response, + phantom: PhantomData, + })), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(error) => Err(Error::from(error)), + } + } +} + +impl From for FutureResponse +where T: Debug + Deserialize + Send + 'static { + fn from(hyper_future_response: HyperFutureResponse) -> FutureResponse { + FutureResponse { + hyper_future_response: hyper_future_response, + phantom: PhantomData, + } + } +} + +impl Response where T: Debug + Deserialize + Send + 'static { + /// The response from the Matrix API. + pub fn data(self) -> Box> { + let bytes = self.hyper_response.body().fold(Vec::new(), |mut bytes, chunk| { + if let Err(error) = bytes.write_all(&chunk) { + return Err(HyperError::from(error)); + } + + Ok(bytes) + }).map_err(Error::from); + + let deserialized_data = bytes.and_then(|bytes| { + from_slice(bytes.as_slice()).map_err(Error::from) + }); + + deserialized_data.boxed() + } + + /// The HTTP response code. + pub fn status(&self) -> &StatusCode { + self.hyper_response.status() + } + + /// The HTTP response headers. + pub fn headers(&self) -> &Headers { + self.hyper_response.headers() + } + + /// The HTTP version. + pub fn http_version(&self) -> &HttpVersion { + self.hyper_response.version() } }