From d0c20c198d4416c409918f694679a100d89a686b Mon Sep 17 00:00:00 2001 From: Konstantinos Sideris <sideris.konstantin@gmail.com> Date: Mon, 14 May 2018 13:33:15 +0300 Subject: [PATCH] Move connection handling to the Session class --- CMakeLists.txt | 1 + examples/room_feed.cpp | 4 +- src/client.cpp | 131 ++------------------------------------- src/client.hpp | 14 ----- src/session.cpp | 136 +++++++++++++++++++++++++++++++++++++++++ src/session.hpp | 25 ++++---- 6 files changed, 158 insertions(+), 153 deletions(-) create mode 100644 src/session.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c46e918eb..006ca6ff1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -125,6 +125,7 @@ set(MTXCLIENT_LIBS ${MTXCLIENT_LIBS} olm) include_directories(src) set(SRC src/client.cpp + src/session.cpp src/utils.cpp src/crypto.cpp) diff --git a/examples/room_feed.cpp b/examples/room_feed.cpp index 924da48a2..25f391ad9 100644 --- a/examples/room_feed.cpp +++ b/examples/room_feed.cpp @@ -105,7 +105,7 @@ sync_handler(const mtx::responses::Sync &res, RequestErr err) // Callback to executed after the first (initial) /sync request completes. void -initial_sync_handler(const nlohmann::json &res, RequestErr err) +initial_sync_handler(const mtx::responses::Sync &res, RequestErr err) { if (err) { cout << "error during initial sync:\n"; @@ -119,7 +119,7 @@ initial_sync_handler(const nlohmann::json &res, RequestErr err) return; } - client->set_next_batch_token(res.at("next_batch")); + client->set_next_batch_token(res.next_batch); client->sync("", client->next_batch_token(), false, 30000, &sync_handler); } diff --git a/src/client.cpp b/src/client.cpp index ddaa0d7ee..9d0c1bcaa 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -37,135 +37,14 @@ Client::close() thread_group_.join_all(); } -void -Client::on_resolve(std::shared_ptr<Session> s, - boost::system::error_code ec, - boost::asio::ip::tcp::resolver::results_type results) -{ - if (ec) - return s->on_failure(s->id, ec); - - boost::asio::async_connect( - s->socket.next_layer(), - results.begin(), - results.end(), - std::bind(&Client::on_connect, shared_from_this(), s, std::placeholders::_1)); -} - -void -Client::on_connect(std::shared_ptr<Session> s, boost::system::error_code ec) -{ - if (ec) { - s->on_failure(s->id, ec); - return remove_session(std::move(s)); - } - - // Perform the SSL handshake - s->socket.async_handshake( - boost::asio::ssl::stream_base::client, - std::bind(&Client::on_handshake, shared_from_this(), s, std::placeholders::_1)); -} - -void -Client::on_handshake(std::shared_ptr<Session> s, boost::system::error_code ec) -{ - if (ec) { - s->on_failure(s->id, ec); - return remove_session(std::move(s)); - } - - boost::beast::http::async_write(s->socket, - s->request, - std::bind(&Client::on_write, - shared_from_this(), - s, - std::placeholders::_1, - std::placeholders::_2)); -} - -void -Client::on_write(std::shared_ptr<Session> s, - boost::system::error_code ec, - std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if (ec) { - s->on_failure(s->id, ec); - return remove_session(std::move(s)); - } - - // Receive the HTTP response - http::async_read( - s->socket, - s->output_buf, - s->parser, - std::bind( - &Client::on_read, shared_from_this(), s, std::placeholders::_1, std::placeholders::_2)); -} - -void -Client::on_read(std::shared_ptr<Session> s, - boost::system::error_code ec, - std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if (ec) - s->error_code = ec; - - on_request_complete(std::move(s)); -} - void Client::do_request(std::shared_ptr<Session> s) { - resolver_.async_resolve(server_, - std::to_string(port_), - std::bind(&Client::on_resolve, - shared_from_this(), - std::move(s), - std::placeholders::_1, - std::placeholders::_2)); -} - -void -Client::remove_session(std::shared_ptr<Session> s) -{ - // Shutting down the connection. This method may - // fail in case the socket is not connected. We don't - // care about the error code if this function fails. - boost::system::error_code ignored_ec; - - s->socket.async_shutdown([s](boost::system::error_code ec) { - if (ec == boost::asio::error::eof) { - // Rationale: - // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error - ec.assign(0, ec.category()); - } - -// SSL_R_SHORT_READ is removed in openssl-1.1 -#if defined SSL_R_SHORT_READ - if (ERR_GET_REASON(ec.value()) == SSL_R_SHORT_READ) - return; -#else - if (ERR_GET_REASON(ec.value()) == boost::asio::ssl::error::stream_truncated) - return; -#endif - - if (ec) - // TODO: propagate the error. - std::cout << "shutdown: " << ec.message() << std::endl; - }); -} - -void -Client::on_request_complete(std::shared_ptr<Session> s) -{ - boost::system::error_code ec(s->error_code); - s->on_success(s->id, s->parser.get(), ec); - - remove_session(std::move(s)); + resolver_.async_resolve( + server_, + std::to_string(port_), + std::bind( + &Session::on_resolve, std::move(s), std::placeholders::_1, std::placeholders::_2)); } void diff --git a/src/client.hpp b/src/client.hpp index 4e46cc856..5b75d2ee1 100644 --- a/src/client.hpp +++ b/src/client.hpp @@ -259,20 +259,6 @@ private: //! Setup http header with the access token if needed. void setup_auth(Session *session, bool auth); - void remove_session(std::shared_ptr<Session> s); - void on_request_complete(std::shared_ptr<Session> s); - void on_resolve(std::shared_ptr<Session> s, - boost::system::error_code ec, - boost::asio::ip::tcp::resolver::results_type results); - void on_connect(std::shared_ptr<Session> s, boost::system::error_code ec); - void on_handshake(std::shared_ptr<Session> s, boost::system::error_code ec); - void on_write(std::shared_ptr<Session> s, - boost::system::error_code ec, - std::size_t bytes_transferred); - void on_read(std::shared_ptr<Session> s, - boost::system::error_code ec, - std::size_t bytes_transferred); - boost::asio::io_service ios_; //! Used to prevent the event loop from shutting down. diff --git a/src/session.cpp b/src/session.cpp new file mode 100644 index 000000000..0c1ce43a4 --- /dev/null +++ b/src/session.cpp @@ -0,0 +1,136 @@ +#include "session.hpp" + +using namespace mtx::client; + +Session::Session(boost::asio::io_service &ios, + boost::asio::ssl::context &ssl_ctx, + const std::string &host, + RequestID id, + SuccessCallback on_success, + FailureCallback on_failure) + : socket(ios, ssl_ctx) + , host(std::move(host)) + , id(std::move(id)) + , on_success(std::move(on_success)) + , on_failure(std::move(on_failure)) +{ + parser.header_limit(8192); + parser.body_limit(1 * 1024 * 1024 * 1024); // 1 GiB +} + +void +Session::on_resolve(boost::system::error_code ec, + boost::asio::ip::tcp::resolver::results_type results) +{ + if (ec) { + on_failure(id, ec); + shutdown(); + return; + } + + boost::asio::async_connect( + socket.next_layer(), + results, + std::bind(&Session::on_connect, shared_from_this(), std::placeholders::_1)); +} + +void +Session::on_close(boost::system::error_code ec) +{ + if (ec == boost::asio::error::eof) { + // Rationale: + // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + ec.assign(0, ec.category()); + } + +// SSL_R_SHORT_READ is removed in openssl-1.1 +#if defined SSL_R_SHORT_READ + if (ERR_GET_REASON(ec.value()) == SSL_R_SHORT_READ) + return; +#else + if (ERR_GET_REASON(ec.value()) == boost::asio::ssl::error::stream_truncated) + return; +#endif + + if (ec) + // TODO: propagate the error. + std::cout << "shutdown: " << ec.message() << std::endl; +} + +void +Session::on_connect(const boost::system::error_code &ec) +{ + if (ec) { + on_failure(id, ec); + shutdown(); + return; + } + + // Perform the SSL handshake + socket.async_handshake( + boost::asio::ssl::stream_base::client, + std::bind(&Session::on_handshake, shared_from_this(), std::placeholders::_1)); +} + +void +Session::shutdown() +{ + socket.async_shutdown( + std::bind(&Session::on_close, shared_from_this(), std::placeholders::_1)); +} + +void +Session::on_request_complete() +{ + boost::system::error_code ec(error_code); + on_success(id, parser.get(), ec); + + shutdown(); +} + +void +Session::on_handshake(const boost::system::error_code &ec) +{ + if (ec) { + on_failure(id, ec); + shutdown(); + return; + } + + boost::beast::http::async_write( + socket, + request, + std::bind( + &Session::on_write, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); +} + +void +Session::on_write(const boost::system::error_code &ec, std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + if (ec) { + on_failure(id, ec); + shutdown(); + return; + } + + // Receive the HTTP response + boost::beast::http::async_read( + socket, + output_buf, + parser, + std::bind( + &Session::on_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); +} + +void +Session::on_read(const boost::system::error_code &ec, std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + if (ec) + error_code = ec; + + on_request_complete(); +} diff --git a/src/session.hpp b/src/session.hpp index ee03e1f81..39b908a57 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -23,23 +23,14 @@ using FailureCallback = std::function<void(RequestID request_id, const boost::system::error_code ec)>; //! Represents a context of a single request. -struct Session +struct Session : public std::enable_shared_from_this<Session> { Session(boost::asio::io_service &ios, boost::asio::ssl::context &ssl_ctx, const std::string &host, RequestID id, SuccessCallback on_success, - FailureCallback on_failure) - : socket{ios, ssl_ctx} - , host{host} - , id{id} - , on_success{on_success} - , on_failure{on_failure} - { - parser.header_limit(8192); - parser.body_limit(1 * 1024 * 1024 * 1024); // 1 GiB - } + FailureCallback on_failure); //! Socket used for communication. boost::asio::ssl::stream<boost::asio::ip::tcp::socket> socket; @@ -60,6 +51,18 @@ struct Session SuccessCallback on_success; //! Function to be called when the request fails. FailureCallback on_failure; + + void on_resolve(boost::system::error_code ec, + boost::asio::ip::tcp::resolver::results_type results); + +private: + void on_close(boost::system::error_code ec); + void on_connect(const boost::system::error_code &ec); + void on_handshake(const boost::system::error_code &ec); + void on_read(const boost::system::error_code &ec, std::size_t bytes_transferred); + void on_request_complete(); + void on_write(const boost::system::error_code &ec, std::size_t bytes_transferred); + void shutdown(); }; template<class Request, boost::beast::http::verb HttpVerb> -- GitLab