LCOV - code coverage report
Current view: top level - irohad/network/impl - block_loader_impl.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 51 68 75.0 %
Date: 2019-03-07 14:46:43 Functions: 11 13 84.6 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "network/impl/block_loader_impl.hpp"
       7             : 
       8             : #include <grpc++/create_channel.h>
       9             : 
      10             : #include "backend/protobuf/block.hpp"
      11             : #include "builders/protobuf/transport_builder.hpp"
      12             : #include "common/bind.hpp"
      13             : #include "interfaces/common_objects/peer.hpp"
      14             : #include "logger/logger.hpp"
      15             : #include "network/impl/grpc_channel_builder.hpp"
      16             : 
      17             : using namespace iroha::ametsuchi;
      18             : using namespace iroha::network;
      19             : using namespace shared_model::crypto;
      20             : using namespace shared_model::interface;
      21             : 
      22             : namespace {
      23             :   const char *kPeerNotFound = "Cannot find peer";
      24             :   const char *kPeerRetrieveFail = "Failed to retrieve peers";
      25             :   const char *kPeerFindFail = "Failed to find requested peer";
      26             : }  // namespace
      27             : 
      28             : BlockLoaderImpl::BlockLoaderImpl(
      29             :     std::shared_ptr<PeerQueryFactory> peer_query_factory,
      30             :     shared_model::proto::ProtoBlockFactory factory,
      31             :     logger::LoggerPtr log)
      32         254 :     : peer_query_factory_(std::move(peer_query_factory)),
      33         254 :       block_factory_(std::move(factory)),
      34         254 :       log_(std::move(log)) {}
      35             : 
      36             : rxcpp::observable<std::shared_ptr<Block>> BlockLoaderImpl::retrieveBlocks(
      37             :     const shared_model::interface::types::HeightType height,
      38             :     const PublicKey &peer_pubkey) {
      39           3 :   return rxcpp::observable<>::create<std::shared_ptr<Block>>(
      40             :       [this, height, &peer_pubkey](auto subscriber) {
      41           3 :         auto peer = this->findPeer(peer_pubkey);
      42           3 :         if (not peer) {
      43           0 :           log_->error(kPeerNotFound);
      44           0 :           subscriber.on_completed();
      45           0 :           return;
      46             :         }
      47             : 
      48           3 :         proto::BlocksRequest request;
      49           3 :         grpc::ClientContext context;
      50           3 :         protocol::Block block;
      51             : 
      52             :         // request next block to our top
      53           3 :         request.set_height(height + 1);
      54             : 
      55             :         auto reader =
      56           3 :             this->getPeerStub(**peer).retrieveBlocks(&context, request);
      57           6 :         while (reader->Read(&block)) {
      58           3 :           auto proto_block = block_factory_.createBlock(std::move(block));
      59           3 :           proto_block.match(
      60             :               [&subscriber](
      61             :                   iroha::expected::Value<std::unique_ptr<Block>> &result) {
      62           3 :                 subscriber.on_next(std::move(result.value));
      63           3 :               },
      64             :               [this,
      65             :                &context](const iroha::expected::Error<std::string> &error) {
      66           0 :                 log_->error(error.error);
      67           0 :                 context.TryCancel();
      68           0 :               });
      69           3 :         }
      70           3 :         reader->Finish();
      71           3 :         subscriber.on_completed();
      72           3 :       });
      73           0 : }
      74             : 
      75             : boost::optional<std::shared_ptr<Block>> BlockLoaderImpl::retrieveBlock(
      76             :     const PublicKey &peer_pubkey, const types::HashType &block_hash) {
      77           4 :   auto peer = findPeer(peer_pubkey);
      78           4 :   if (not peer) {
      79           0 :     log_->error(kPeerNotFound);
      80           0 :     return boost::none;
      81             :   }
      82             : 
      83           4 :   proto::BlockRequest request;
      84           4 :   grpc::ClientContext context;
      85           4 :   protocol::Block block;
      86             : 
      87             :   // request block with specified hash
      88           4 :   request.set_hash(toBinaryString(block_hash));
      89             : 
      90           4 :   auto status = getPeerStub(**peer).retrieveBlock(&context, request, &block);
      91           4 :   if (not status.ok()) {
      92           1 :     log_->warn(status.error_message());
      93           1 :     return boost::none;
      94             :   }
      95             : 
      96           3 :   auto result = block_factory_.createBlock(std::move(block));
      97             : 
      98           3 :   return result.match(
      99             :       [](iroha::expected::Value<std::unique_ptr<Block>> &v) {
     100           3 :         return boost::make_optional(std::shared_ptr<Block>(std::move(v.value)));
     101           0 :       },
     102             :       [this](const iroha::expected::Error<std::string> &e)
     103             :           -> boost::optional<std::shared_ptr<Block>> {
     104           0 :         log_->error(e.error);
     105           0 :         return boost::none;
     106             :       });
     107           4 : }
     108             : 
     109             : boost::optional<std::shared_ptr<shared_model::interface::Peer>>
     110             : BlockLoaderImpl::findPeer(const shared_model::crypto::PublicKey &pubkey) {
     111           7 :   auto peers = peer_query_factory_->createPeerQuery() |
     112             :       [](const auto &query) { return query->getLedgerPeers(); };
     113           7 :   if (not peers) {
     114           0 :     log_->error(kPeerRetrieveFail);
     115           0 :     return boost::none;
     116             :   }
     117             : 
     118           7 :   auto &blob = pubkey.blob();
     119           7 :   auto it = std::find_if(
     120             :       peers.value().begin(), peers.value().end(), [&blob](const auto &peer) {
     121           7 :         return peer->pubkey().blob() == blob;
     122             :       });
     123           7 :   if (it == peers.value().end()) {
     124           0 :     log_->error(kPeerFindFail);
     125           0 :     return boost::none;
     126             :   }
     127           7 :   return *it;
     128           7 : }
     129             : 
     130             : proto::Loader::Stub &BlockLoaderImpl::getPeerStub(
     131             :     const shared_model::interface::Peer &peer) {
     132           7 :   auto it = peer_connections_.find(peer.address());
     133           7 :   if (it == peer_connections_.end()) {
     134           7 :     it = peer_connections_
     135           7 :              .insert(std::make_pair(
     136           7 :                  peer.address(),
     137           7 :                  network::createClient<proto::Loader>(peer.address())))
     138           7 :              .first;
     139           7 :   }
     140           7 :   return *it->second;
     141           0 : }

Generated by: LCOV version 1.13