LCOV - code coverage report
Current view: top level - irohad/torii/impl - command_service_transport_grpc.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 107 124 86.3 %
Date: 2019-03-07 14:46:43 Functions: 26 29 89.7 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "common/default_constructible_unary_fn.hpp"  // non-copyable value workaround
       7             : 
       8             : #include "torii/impl/command_service_transport_grpc.hpp"
       9             : 
      10             : #include <atomic>
      11             : #include <condition_variable>
      12             : #include <iterator>
      13             : 
      14             : #include <boost/format.hpp>
      15             : #include <boost/range/adaptor/filtered.hpp>
      16             : #include <boost/range/adaptor/transformed.hpp>
      17             : #include "backend/protobuf/transaction_responses/proto_tx_response.hpp"
      18             : #include "common/combine_latest_until_first_completed.hpp"
      19             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      20             : #include "interfaces/iroha_internal/transaction_batch_factory.hpp"
      21             : #include "interfaces/iroha_internal/transaction_batch_parser.hpp"
      22             : #include "interfaces/iroha_internal/tx_status_factory.hpp"
      23             : #include "interfaces/transaction.hpp"
      24             : #include "logger/logger.hpp"
      25             : #include "torii/status_bus.hpp"
      26             : 
      27             : namespace iroha {
      28             :   namespace torii {
      29             : 
      30             :     CommandServiceTransportGrpc::CommandServiceTransportGrpc(
      31             :         std::shared_ptr<CommandService> command_service,
      32             :         std::shared_ptr<iroha::torii::StatusBus> status_bus,
      33             :         std::shared_ptr<shared_model::interface::TxStatusFactory>
      34             :             status_factory,
      35             :         std::shared_ptr<TransportFactoryType> transaction_factory,
      36             :         std::shared_ptr<shared_model::interface::TransactionBatchParser>
      37             :             batch_parser,
      38             :         std::shared_ptr<shared_model::interface::TransactionBatchFactory>
      39             :             transaction_batch_factory,
      40             :         rxcpp::observable<ConsensusGateEvent> consensus_gate_objects,
      41             :         int maximum_rounds_without_update,
      42             :         logger::LoggerPtr log)
      43         253 :         : command_service_(std::move(command_service)),
      44         253 :           status_bus_(std::move(status_bus)),
      45         253 :           status_factory_(std::move(status_factory)),
      46         253 :           transaction_factory_(std::move(transaction_factory)),
      47         253 :           batch_parser_(std::move(batch_parser)),
      48         253 :           batch_factory_(std::move(transaction_batch_factory)),
      49         253 :           log_(std::move(log)),
      50         253 :           consensus_gate_objects_(std::move(consensus_gate_objects)),
      51         253 :           maximum_rounds_without_update_(maximum_rounds_without_update) {}
      52             : 
      53             :     grpc::Status CommandServiceTransportGrpc::Torii(
      54             :         grpc::ServerContext *context,
      55             :         const iroha::protocol::Transaction *request,
      56             :         google::protobuf::Empty *response) {
      57         783 :       iroha::protocol::TxList single_tx_list;
      58         783 :       *single_tx_list.add_transactions() = *request;
      59         783 :       return ListTorii(context, &single_tx_list, response);
      60         783 :     }
      61             : 
      62             :     namespace {
      63             :       /**
      64             :        * Form an error message, which is to be shared between all transactions,
      65             :        * if there are several of them, or individual message, if there's only
      66             :        * one
      67             :        * @param tx_hashes is non empty hash list to form error message from
      68             :        * @param error of those tx(s)
      69             :        * @return message
      70             :        */
      71             :       std::string formErrorMessage(
      72             :           const std::vector<shared_model::crypto::Hash> &tx_hashes,
      73             :           const std::string &error) {
      74           1 :         if (tx_hashes.size() == 1) {
      75           1 :           return (boost::format("Stateless invalid tx, error: %s, hash: %s")
      76           1 :                   % error % tx_hashes[0].hex())
      77           1 :               .str();
      78             :         }
      79             : 
      80             :         std::string folded_hashes =
      81           0 :             std::accumulate(std::next(tx_hashes.begin()),
      82           0 :                             tx_hashes.end(),
      83           0 :                             tx_hashes[0].hex(),
      84             :                             [](auto &&acc, const auto &h) -> std::string {
      85           0 :                               return acc + ", " + h.hex();
      86           0 :                             });
      87             : 
      88           0 :         return (boost::format(
      89             :                     "Stateless invalid tx in transaction sequence, error: %s\n"
      90             :                     "Hash list: [%s]")
      91           0 :                 % error % folded_hashes)
      92           0 :             .str();
      93           1 :       }
      94             :     }  // namespace
      95             : 
      96             :     shared_model::interface::types::SharedTxsCollectionType
      97             :     CommandServiceTransportGrpc::deserializeTransactions(
      98             :         const iroha::protocol::TxList *request) {
      99         790 :       shared_model::interface::types::SharedTxsCollectionType tx_collection;
     100        1597 :       for (const auto &tx : request->transactions()) {
     101         807 :         transaction_factory_->build(tx).match(
     102             :             [&tx_collection](
     103             :                 iroha::expected::Value<
     104             :                     std::unique_ptr<shared_model::interface::Transaction>> &v) {
     105         748 :               tx_collection.emplace_back(std::move(v).value);
     106         748 :             },
     107             :             [this](iroha::expected::Error<TransportFactoryType::Error> &error) {
     108          59 :               status_bus_->publish(status_factory_->makeStatelessFail(
     109          59 :                   error.error.hash,
     110          59 :                   shared_model::interface::TxStatusFactory::TransactionError{
     111          59 :                       error.error.error, 0, 0}));
     112          59 :             });
     113             :       }
     114         790 :       return tx_collection;
     115         790 :     }
     116             : 
     117             :     grpc::Status CommandServiceTransportGrpc::ListTorii(
     118             :         grpc::ServerContext *context,
     119             :         const iroha::protocol::TxList *request,
     120             :         google::protobuf::Empty *response) {
     121         790 :       auto transactions = deserializeTransactions(request);
     122             : 
     123         790 :       auto batches = batch_parser_->parseBatches(transactions);
     124             : 
     125        1533 :       for (auto &batch : batches) {
     126         743 :         batch_factory_->createTransactionBatch(batch).match(
     127             :             [&](iroha::expected::Value<std::unique_ptr<
     128             :                     shared_model::interface::TransactionBatch>> &value) {
     129         742 :               this->command_service_->handleTransactionBatch(
     130         742 :                   std::move(value).value);
     131         742 :             },
     132             :             [&](iroha::expected::Error<std::string> &error) {
     133           1 :               std::vector<shared_model::crypto::Hash> hashes;
     134             : 
     135         743 :               std::transform(batch.begin(),
     136           1 :                              batch.end(),
     137           1 :                              std::back_inserter(hashes),
     138             :                              [](const auto &tx) { return tx->hash(); });
     139             : 
     140           1 :               auto error_msg = formErrorMessage(hashes, error.error);
     141             :               // set error response for each transaction in a batch candidate
     142           1 :               std::for_each(
     143             :                   hashes.begin(), hashes.end(), [this, &error_msg](auto &hash) {
     144           1 :                     status_bus_->publish(status_factory_->makeStatelessFail(
     145           1 :                         hash,
     146           1 :                         shared_model::interface::TxStatusFactory::
     147           1 :                             TransactionError{error_msg, 0, 0}));
     148           1 :                   });
     149           1 :             });
     150             :       }
     151             : 
     152         790 :       return grpc::Status::OK;
     153         790 :     }
     154             : 
     155             :     grpc::Status CommandServiceTransportGrpc::Status(
     156             :         grpc::ServerContext *context,
     157             :         const iroha::protocol::TxStatusRequest *request,
     158             :         iroha::protocol::ToriiResponse *response) {
     159         783 :       *response =
     160         783 :           std::static_pointer_cast<shared_model::proto::TransactionResponse>(
     161         783 :               command_service_->getStatus(
     162         783 :                   shared_model::crypto::Hash::fromHexString(
     163         783 :                       request->tx_hash())))
     164         783 :               ->getTransport();
     165         783 :       return grpc::Status::OK;
     166           0 :     }
     167             : 
     168             :     namespace {
     169             :       void handleEvents(rxcpp::composite_subscription &subscription,
     170             :                         rxcpp::schedulers::run_loop &run_loop) {
     171           2 :         std::condition_variable wait_cv;
     172             : 
     173           2 :         run_loop.set_notify_earlier_wakeup(
     174             :             [&wait_cv](const auto &) { wait_cv.notify_one(); });
     175             : 
     176           2 :         std::mutex wait_mutex;
     177           2 :         std::unique_lock<std::mutex> lock(wait_mutex);
     178           4 :         while (subscription.is_subscribed() or not run_loop.empty()) {
     179          13 :           while (not run_loop.empty()
     180          13 :                  and run_loop.peek().when <= run_loop.now()) {
     181          11 :             run_loop.dispatch();
     182             :           }
     183             : 
     184           2 :           if (run_loop.empty()) {
     185             :             wait_cv.wait(lock, [&run_loop, &subscription]() {
     186           2 :               return not subscription.is_subscribed() or not run_loop.empty();
     187             :             });
     188           2 :           } else {
     189           0 :             wait_cv.wait_until(lock, run_loop.peek().when);
     190             :           }
     191             :         }
     192           2 :       }
     193             :     }  // namespace
     194             : 
     195             :     grpc::Status CommandServiceTransportGrpc::StatusStream(
     196             :         grpc::ServerContext *context,
     197             :         const iroha::protocol::TxStatusRequest *request,
     198             :         grpc::ServerWriter<iroha::protocol::ToriiResponse> *response_writer) {
     199           2 :       rxcpp::schedulers::run_loop rl;
     200             : 
     201           2 :       auto current_thread = rxcpp::synchronize_in_one_worker(
     202           2 :           rxcpp::schedulers::make_run_loop(rl));
     203             : 
     204           2 :       rxcpp::composite_subscription subscription;
     205             : 
     206           2 :       auto hash = shared_model::crypto::Hash::fromHexString(request->tx_hash());
     207             : 
     208           2 :       auto client_id_format = boost::format("Peer: '%s', %s");
     209             :       std::string client_id =
     210           2 :           (client_id_format % context->peer() % hash.toString()).str();
     211           2 :       auto status_bus = command_service_->getStatusStream(hash);
     212             :       auto consensus_gate_observable =
     213           2 :           consensus_gate_objects_
     214             :               // a dummy start_with lets us don't wait for the consensus event
     215             :               // on further combine_latest
     216           2 :               .start_with(ConsensusGateEvent{});
     217             : 
     218           2 :       boost::optional<iroha::protocol::TxStatus> last_tx_status;
     219           2 :       auto rounds_counter{0};
     220           2 :       makeCombineLatestUntilFirstCompleted(
     221             :           status_bus,
     222             :           current_thread,
     223             :           [](auto status, auto) { return status; },
     224             :           consensus_gate_observable)
     225             :           // complete the observable if client is disconnected or too many
     226             :           // rounds have passed without tx status change
     227             :           .take_while([=, &rounds_counter, &last_tx_status](
     228             :                           const auto &response) {
     229           2 :             const auto &proto_response =
     230           2 :                 std::static_pointer_cast<
     231           2 :                     shared_model::proto::TransactionResponse>(response)
     232           2 :                     ->getTransport();
     233             : 
     234           2 :             if (context->IsCancelled()) {
     235           2 :               log_->debug("client unsubscribed, {}", client_id);
     236           0 :               return false;
     237             :             }
     238             : 
     239             :             // increment round counter when the same status arrived again.
     240           2 :             auto status = proto_response.tx_status();
     241           2 :             auto status_is_same =
     242           2 :                 last_tx_status and (status == *last_tx_status);
     243           2 :             if (status_is_same) {
     244           1 :               ++rounds_counter;
     245           1 :               if (rounds_counter >= maximum_rounds_without_update_) {
     246             :                 // we stop the stream when round counter is greater than
     247             :                 // allowed.
     248           0 :                 return false;
     249             :               }
     250             :               // omit the received status, but do not stop the stream
     251           1 :               return true;
     252             :             }
     253           1 :             rounds_counter = 0;
     254           1 :             last_tx_status = status;
     255             : 
     256             :             // write a new status to the stream
     257           2 :             if (not response_writer->Write(proto_response)) {
     258           0 :               log_->error("write to stream has failed to client {}", client_id);
     259           0 :               return false;
     260             :             }
     261           1 :             log_->debug("status written, {}", client_id);
     262           1 :             return true;
     263           2 :           })
     264           2 :           .subscribe(subscription,
     265             :                      [](const auto &) {},
     266             :                      [&](std::exception_ptr ep) {
     267           0 :                        log_->error("something bad happened, client_id {}",
     268           0 :                                    client_id);
     269           0 :                      },
     270             :                      [&] { log_->debug("stream done, {}", client_id); });
     271             : 
     272             :       // run loop while subscription is active or there are pending events in
     273             :       // the queue
     274           2 :       handleEvents(subscription, rl);
     275             : 
     276           2 :       log_->debug("status stream done, {}", client_id);
     277           2 :       return grpc::Status::OK;
     278           2 :     }
     279             :   }  // namespace torii
     280             : }  // namespace iroha

Generated by: LCOV version 1.13