LCOV - code coverage report
Current view: top level - irohad/multi_sig_transactions/transport/impl - mst_transport_grpc.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 76 84 90.5 %
Date: 2019-03-07 14:46:43 Functions: 17 20 85.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "common/default_constructible_unary_fn.hpp"  // non-copyable value workaround
       7             : 
       8             : #include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
       9             : 
      10             : #include <boost/range/adaptor/filtered.hpp>
      11             : #include <boost/range/adaptor/transformed.hpp>
      12             : #include "ametsuchi/tx_presence_cache.hpp"
      13             : #include "backend/protobuf/transaction.hpp"
      14             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      15             : #include "interfaces/transaction.hpp"
      16             : #include "logger/logger.hpp"
      17             : #include "validators/field_validator.hpp"
      18             : 
      19             : using namespace iroha;
      20             : using namespace iroha::network;
      21             : 
      22             : using iroha::ConstRefState;
      23             : 
      24             : void sendStateAsyncImpl(const shared_model::interface::Peer &to,
      25             :                         ConstRefState state,
      26             :                         const std::string &sender_key,
      27             :                         AsyncGrpcClient<google::protobuf::Empty> &async_call);
      28             : 
      29             : MstTransportGrpc::MstTransportGrpc(
      30             :     std::shared_ptr<AsyncGrpcClient<google::protobuf::Empty>> async_call,
      31             :     std::shared_ptr<TransportFactoryType> transaction_factory,
      32             :     std::shared_ptr<shared_model::interface::TransactionBatchParser>
      33             :         batch_parser,
      34             :     std::shared_ptr<shared_model::interface::TransactionBatchFactory>
      35             :         transaction_batch_factory,
      36             :     std::shared_ptr<iroha::ametsuchi::TxPresenceCache> tx_presence_cache,
      37             :     std::shared_ptr<Completer> mst_completer,
      38             :     shared_model::crypto::PublicKey my_key,
      39             :     logger::LoggerPtr mst_state_logger,
      40             :     logger::LoggerPtr log)
      41          10 :     : async_call_(std::move(async_call)),
      42          10 :       transaction_factory_(std::move(transaction_factory)),
      43          10 :       batch_parser_(std::move(batch_parser)),
      44          10 :       batch_factory_(std::move(transaction_batch_factory)),
      45          10 :       tx_presence_cache_(std::move(tx_presence_cache)),
      46          10 :       mst_completer_(std::move(mst_completer)),
      47          10 :       my_key_(shared_model::crypto::toBinaryString(my_key)),
      48          10 :       mst_state_logger_(std::move(mst_state_logger)),
      49          10 :       log_(std::move(log)) {}
      50             : 
      51             : shared_model::interface::types::SharedTxsCollectionType
      52             : MstTransportGrpc::deserializeTransactions(const transport::MstState *request) {
      53           6 :   return boost::copy_range<
      54             :       shared_model::interface::types::SharedTxsCollectionType>(
      55           6 :       request->transactions()
      56           6 :       | boost::adaptors::transformed(
      57             :             [&](const auto &tx) { return transaction_factory_->build(tx); })
      58             :       | boost::adaptors::filtered([&](const auto &result) {
      59          10 :           return result.match(
      60             :               [](const iroha::expected::Value<
      61             :                   std::unique_ptr<shared_model::interface::Transaction>> &) {
      62           9 :                 return true;
      63             :               },
      64             :               [&](const iroha::expected::Error<TransportFactoryType::Error>
      65             :                       &error) {
      66           1 :                 log_->info("Transaction deserialization failed: hash {}, {}",
      67           1 :                            error.error.hash,
      68           1 :                            error.error.error);
      69           1 :                 return false;
      70           0 :               });
      71             :         })
      72             :       | boost::adaptors::transformed([&](auto result) {
      73           9 :           return std::move(
      74           9 :                      boost::get<iroha::expected::ValueOf<decltype(result)>>(
      75           9 :                          result))
      76           9 :               .value;
      77             :         }));
      78           0 : }
      79             : 
      80             : grpc::Status MstTransportGrpc::SendState(
      81             :     ::grpc::ServerContext *context,
      82             :     const ::iroha::network::transport::MstState *request,
      83             :     ::google::protobuf::Empty *response) {
      84           5 :   log_->info("MstState Received");
      85             : 
      86           5 :   auto transactions = deserializeTransactions(request);
      87             : 
      88           5 :   auto batches = batch_parser_->parseBatches(transactions);
      89             : 
      90           6 :   MstState new_state = MstState::empty(mst_state_logger_, mst_completer_);
      91             : 
      92          13 :   for (auto &batch : batches) {
      93           7 :     batch_factory_->createTransactionBatch(batch).match(
      94             :         [&](iroha::expected::Value<std::unique_ptr<
      95             :                 shared_model::interface::TransactionBatch>> &value) {
      96           7 :           auto cache_presence = tx_presence_cache_->check(*value.value);
      97           7 :           if (not cache_presence) {
      98             :             // TODO andrei 30.11.18 IR-51 Handle database error
      99           1 :             log_->warn("Check tx presence database error. Batch: {}",
     100           1 :                        *value.value);
     101           1 :             return;
     102             :           }
     103           6 :           auto is_replay = std::any_of(
     104           6 :               cache_presence->begin(),
     105           6 :               cache_presence->end(),
     106             :               [](const auto &tx_status) {
     107           7 :                 return iroha::visit_in_place(
     108           7 :                     tx_status,
     109             :                     [](const iroha::ametsuchi::tx_cache_status_responses::
     110           6 :                            Missing &) { return false; },
     111             :                     [](const auto &) { return true; });
     112             :               });
     113             : 
     114           6 :           if (not is_replay) {
     115           5 :             new_state += std::move(value).value;
     116           5 :           }
     117           7 :         },
     118             :         [&](iroha::expected::Error<std::string> &error) {
     119           0 :           log_->warn("Batch deserialization failed: {}", error.error);
     120           0 :         });
     121             :   }
     122             : 
     123           6 :   log_->info("batches in MstState: {}", new_state.getBatches().size());
     124             : 
     125           6 :   shared_model::crypto::PublicKey source_key(request->source_peer_key());
     126             :   auto key_invalid_reason =
     127           6 :       shared_model::validation::validatePubkey(source_key);
     128           6 :   if (key_invalid_reason) {
     129           1 :     log_->info("Dropping received MST State due to invalid public key: {}",
     130           1 :                *key_invalid_reason);
     131           1 :     return grpc::Status::OK;
     132             :   }
     133             : 
     134           5 :   if (new_state.isEmpty()) {
     135           2 :     log_->info(
     136           2 :         "All transactions from received MST state have been processed already, "
     137             :         "nothing to propagate to MST processor");
     138           2 :     return grpc::Status::OK;
     139             :   }
     140             : 
     141           3 :   if (auto subscriber = subscriber_.lock()) {
     142           3 :     subscriber->onNewState(source_key, std::move(new_state));
     143           3 :   } else {
     144           0 :     log_->warn("No subscriber for MST SendState event is set");
     145             :   }
     146             : 
     147           3 :   return grpc::Status::OK;
     148           6 : }
     149             : 
     150             : void MstTransportGrpc::subscribe(
     151             :     std::shared_ptr<MstTransportNotification> notification) {
     152          10 :   subscriber_ = notification;
     153          10 : }
     154             : 
     155             : void MstTransportGrpc::sendState(const shared_model::interface::Peer &to,
     156             :                                  ConstRefState providing_state) {
     157           3 :   log_->info("Propagate MstState to peer {}", to.address());
     158           3 :   sendStateAsyncImpl(to, providing_state, my_key_, *async_call_);
     159           3 : }
     160             : 
     161             : void iroha::network::sendStateAsync(
     162             :     const shared_model::interface::Peer &to,
     163             :     ConstRefState state,
     164             :     const shared_model::crypto::PublicKey &sender_key,
     165             :     AsyncGrpcClient<google::protobuf::Empty> &async_call) {
     166           0 :   sendStateAsyncImpl(
     167           0 :       to, state, shared_model::crypto::toBinaryString(sender_key), async_call);
     168           0 : }
     169             : 
     170             : void sendStateAsyncImpl(const shared_model::interface::Peer &to,
     171             :                         ConstRefState state,
     172             :                         const std::string &sender_key,
     173             :                         AsyncGrpcClient<google::protobuf::Empty> &async_call) {
     174           3 :   std::unique_ptr<transport::MstTransportGrpc::StubInterface> client =
     175           3 :       transport::MstTransportGrpc::NewStub(grpc::CreateChannel(
     176           3 :           to.address(), grpc::InsecureChannelCredentials()));
     177             : 
     178           3 :   transport::MstState protoState;
     179           3 :   protoState.set_source_peer_key(sender_key);
     180           8 :   for (auto &batch : state.getBatches()) {
     181          10 :     for (auto &tx : batch->transactions()) {
     182             :       // TODO (@l4l) 04/03/18 simplify with IR-1040
     183           5 :       *protoState.add_transactions() =
     184           5 :           std::static_pointer_cast<shared_model::proto::Transaction>(tx)
     185           5 :               ->getTransport();
     186             :     }
     187             :   }
     188             : 
     189             :   async_call.Call([&](auto context, auto cq) {
     190           3 :     return client->AsyncSendState(context, protoState, cq);
     191             :   });
     192           3 : }

Generated by: LCOV version 1.13