LCOV - code coverage report
Current view: top level - irohad/main/impl - on_demand_ordering_init.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 127 133 95.5 %
Date: 2019-03-07 14:46:43 Functions: 36 36 100.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "main/impl/on_demand_ordering_init.hpp"
       7             : 
       8             : #include "common/bind.hpp"
       9             : #include "common/delay.hpp"
      10             : #include "cryptography/crypto_provider/crypto_defaults.hpp"
      11             : #include "datetime/time.hpp"
      12             : #include "interfaces/common_objects/peer.hpp"
      13             : #include "interfaces/common_objects/types.hpp"
      14             : #include "logger/logger.hpp"
      15             : #include "logger/logger_manager.hpp"
      16             : #include "ordering/impl/on_demand_common.hpp"
      17             : #include "ordering/impl/on_demand_connection_manager.hpp"
      18             : #include "ordering/impl/on_demand_ordering_gate.hpp"
      19             : #include "ordering/impl/on_demand_ordering_service_impl.hpp"
      20             : #include "ordering/impl/on_demand_os_client_grpc.hpp"
      21             : #include "ordering/impl/on_demand_os_server_grpc.hpp"
      22             : #include "ordering/impl/ordering_gate_cache/on_demand_cache.hpp"
      23             : 
      24             : namespace {
      25             :   /// match event and call corresponding lambda depending on sync_outcome
      26             :   template <typename OnBlocks, typename OnNothing>
      27             :   auto matchEvent(const iroha::synchronizer::SynchronizationEvent &event,
      28             :                   OnBlocks &&on_blocks,
      29             :                   OnNothing &&on_nothing) {
      30             :     using iroha::synchronizer::SynchronizationOutcomeType;
      31        3373 :     switch (event.sync_outcome) {
      32             :       case SynchronizationOutcomeType::kCommit:
      33         964 :         return std::forward<OnBlocks>(on_blocks)(event);
      34             :       case SynchronizationOutcomeType::kReject:
      35             :       case SynchronizationOutcomeType::kNothing:
      36        2409 :         return std::forward<OnNothing>(on_nothing)(event);
      37             :       default:
      38           0 :         BOOST_ASSERT_MSG(false, "Unknown value");
      39             :     }
      40        3373 :   }
      41             : }  // namespace
      42             : 
      43             : namespace iroha {
      44             :   namespace network {
      45             : 
      46             :     OnDemandOrderingInit::OnDemandOrderingInit(logger::LoggerPtr log)
      47         247 :         : log_(std::move(log)) {}
      48             : 
      49             :     auto OnDemandOrderingInit::createNotificationFactory(
      50             :         std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
      51             :             async_call,
      52             :         std::shared_ptr<TransportFactoryType> proposal_transport_factory,
      53             :         std::chrono::milliseconds delay,
      54             :         const logger::LoggerManagerTreePtr &ordering_log_manager) {
      55         247 :       return std::make_shared<ordering::transport::OnDemandOsClientGrpcFactory>(
      56         247 :           std::move(async_call),
      57         247 :           std::move(proposal_transport_factory),
      58             :           [] { return std::chrono::system_clock::now(); },
      59             :           delay,
      60         247 :           ordering_log_manager->getChild("NetworkClient")->getLogger());
      61           0 :     }
      62             : 
      63             :     auto OnDemandOrderingInit::createConnectionManager(
      64             :         std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
      65             :         std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
      66             :             async_call,
      67             :         std::shared_ptr<TransportFactoryType> proposal_transport_factory,
      68             :         std::chrono::milliseconds delay,
      69             :         std::vector<shared_model::interface::types::HashType> initial_hashes,
      70             :         const logger::LoggerManagerTreePtr &ordering_log_manager) {
      71             :       // since top block will be the first in notifier observable, hashes of
      72             :       // two previous blocks are prepended
      73         247 :       const size_t kBeforePreviousTop = 0, kPreviousTop = 1;
      74             : 
      75             :       // flat map hashes from committed blocks
      76         247 :       auto all_hashes = notifier.get_observable()
      77             :                             .flat_map([](auto commit) {
      78       10119 :                               return commit.synced_blocks.map(
      79             :                                   [](auto block) { return block->hash(); });
      80           0 :                             })
      81             :                             // prepend hashes for the first two rounds
      82         247 :                             .start_with(initial_hashes.at(kBeforePreviousTop),
      83         247 :                                         initial_hashes.at(kPreviousTop));
      84             : 
      85             :       // emit last k + 1 hashes, where k is the delay parameter
      86             :       // current implementation assumes k = 2
      87             :       // first hash is used for kCurrentRound
      88             :       // second hash is used for kNextRound
      89             :       // third hash is used for kRoundAfterNext
      90             :       auto latest_hashes =
      91         247 :           all_hashes.zip(all_hashes.skip(1), all_hashes.skip(2));
      92             : 
      93             :       auto map_peers = [this, peer_query_factory](auto &&latest_data)
      94             :           -> ordering::OnDemandConnectionManager::CurrentPeers {
      95        3373 :         auto &latest_commit = std::get<0>(latest_data);
      96        3373 :         auto &current_hashes = std::get<1>(latest_data);
      97             : 
      98        3373 :         consensus::Round current_round = latest_commit.round;
      99             : 
     100             :         auto on_blocks = [this,
     101        3373 :                           peer_query_factory,
     102        3373 :                           current_hashes,
     103             :                           &current_round](const auto &commit) {
     104         964 :           current_round = ordering::nextCommitRound(current_round);
     105             : 
     106             :           // retrieve peer list from database
     107             :           // TODO lebdron 08.11.2018 IR-1853 Refactor PeerQuery without
     108             :           // database access and optional
     109             :           peer_query_factory->createPeerQuery() | [](auto &&query) {
     110         888 :             return query->getLedgerPeers();
     111             :           } | [this](auto &&peers) { current_peers_ = std::move(peers); };
     112             : 
     113             :           // generate permutation of peers list from corresponding round
     114             :           // hash
     115             :           auto generate_permutation = [&](auto round) {
     116         964 :             auto &hash = std::get<round()>(current_hashes);
     117         964 :             log_->debug("Using hash: {}", hash.toString());
     118         964 :             auto &permutation = permutations_[round()];
     119             : 
     120         964 :             std::seed_seq seed(hash.blob().begin(), hash.blob().end());
     121         964 :             gen_.seed(seed);
     122             : 
     123         964 :             permutation.resize(current_peers_.size());
     124         964 :             std::iota(permutation.begin(), permutation.end(), 0);
     125             : 
     126         964 :             std::shuffle(permutation.begin(), permutation.end(), gen_);
     127         964 :           };
     128             : 
     129         964 :           generate_permutation(RoundTypeConstant<kCurrentRound>{});
     130         964 :           generate_permutation(RoundTypeConstant<kNextRound>{});
     131         964 :           generate_permutation(RoundTypeConstant<kRoundAfterNext>{});
     132         964 :         };
     133             :         auto on_nothing = [&current_round](const auto &) {
     134        2409 :           current_round = ordering::nextRejectRound(current_round);
     135        2409 :         };
     136             : 
     137        3373 :         matchEvent(latest_commit, on_blocks, on_nothing);
     138             : 
     139             :         auto getOsPeer = [this, &current_round](auto block_round_advance,
     140             :                                                 auto reject_round) {
     141       13492 :           auto &permutation = permutations_[block_round_advance];
     142             :           // since reject round can be greater than number of peers, wrap it
     143             :           // with number of peers
     144       13492 :           auto &peer =
     145       13492 :               current_peers_[permutation[reject_round % permutation.size()]];
     146       13492 :           log_->debug(
     147       13492 :               "For {}, using OS on peer: {}",
     148       13492 :               consensus::Round{current_round.block_round + block_round_advance,
     149       13492 :                                reject_round},
     150       13492 :               *peer);
     151       13492 :           return peer;
     152           0 :         };
     153             : 
     154             :         using ordering::OnDemandConnectionManager;
     155        3373 :         OnDemandConnectionManager::CurrentPeers peers;
     156             :         /*
     157             :          * See detailed description in
     158             :          * irohad/ordering/impl/on_demand_connection_manager.cpp
     159             :          *
     160             :          *   0 1 2
     161             :          * 0 o x v
     162             :          * 1 x v .
     163             :          * 2 v . .
     164             :          *
     165             :          * v, round 0 - kCurrentRoundRejectConsumer
     166             :          * v, round 1 - kNextRoundRejectConsumer
     167             :          * v, round 2 - kNextRoundCommitConsumer
     168             :          * o, round 0 - kIssuer
     169             :          */
     170        3373 :         peers.peers.at(OnDemandConnectionManager::kCurrentRoundRejectConsumer) =
     171        3373 :             getOsPeer(kCurrentRound,
     172        3373 :                       ordering::currentRejectRoundConsumer(
     173        3373 :                           current_round.reject_round));
     174        3373 :         peers.peers.at(OnDemandConnectionManager::kNextRoundRejectConsumer) =
     175        3373 :             getOsPeer(kNextRound, ordering::kNextRejectRoundConsumer);
     176        3373 :         peers.peers.at(OnDemandConnectionManager::kNextRoundCommitConsumer) =
     177        3373 :             getOsPeer(kRoundAfterNext, ordering::kNextCommitRoundConsumer);
     178        3373 :         peers.peers.at(OnDemandConnectionManager::kIssuer) =
     179        3373 :             getOsPeer(kCurrentRound, current_round.reject_round);
     180        3373 :         return peers;
     181        3373 :       };
     182             : 
     183         247 :       auto peers = notifier.get_observable()
     184         247 :                        .with_latest_from(latest_hashes)
     185         247 :                        .map(map_peers);
     186             : 
     187         247 :       return std::make_shared<ordering::OnDemandConnectionManager>(
     188         247 :           createNotificationFactory(std::move(async_call),
     189         247 :                                     std::move(proposal_transport_factory),
     190         247 :                                     delay,
     191         247 :                                     ordering_log_manager),
     192             :           peers,
     193         247 :           ordering_log_manager->getChild("ConnectionManager")->getLogger());
     194         247 :     }
     195             : 
     196             :     auto OnDemandOrderingInit::createGate(
     197             :         std::shared_ptr<ordering::OnDemandOrderingService> ordering_service,
     198             :         std::shared_ptr<ordering::transport::OdOsNotification> network_client,
     199             :         std::shared_ptr<ordering::cache::OrderingGateCache> cache,
     200             :         std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
     201             :             proposal_factory,
     202             :         std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
     203             :         consensus::Round initial_round,
     204             :         std::function<std::chrono::milliseconds(
     205             :             const synchronizer::SynchronizationEvent &)> delay_func,
     206             :         const logger::LoggerManagerTreePtr &ordering_log_manager) {
     207             :       auto map = [](auto commit) {
     208        3373 :         return matchEvent(
     209             :             commit,
     210             :             [](const auto &commit)
     211             :                 -> ordering::OnDemandOrderingGate::BlockRoundEventType {
     212         964 :               ordering::cache::OrderingGateCache::HashesSetType hashes;
     213         964 :               commit.synced_blocks.as_blocking().subscribe(
     214             :                   [&hashes](const auto &block) {
     215         964 :                     const auto &committed = block->transactions();
     216         964 :                     std::transform(committed.begin(),
     217         964 :                                    committed.end(),
     218         964 :                                    std::inserter(hashes, hashes.end()),
     219             :                                    [](const auto &transaction) {
     220         914 :                                      return transaction.hash();
     221             :                                    });
     222         964 :                     const auto &rejected =
     223         964 :                         block->rejected_transactions_hashes();
     224         964 :                     std::copy(rejected.begin(),
     225         964 :                               rejected.end(),
     226         964 :                               std::inserter(hashes, hashes.end()));
     227         964 :                   });
     228         964 :               return ordering::OnDemandOrderingGate::BlockEvent{
     229         964 :                   ordering::nextCommitRound(commit.round), hashes};
     230         964 :             },
     231             :             [](const auto &nothing)
     232             :                 -> ordering::OnDemandOrderingGate::BlockRoundEventType {
     233        2409 :               return ordering::OnDemandOrderingGate::EmptyEvent{
     234        2409 :                   ordering::nextRejectRound(nothing.round)};
     235             :             });
     236             :       };
     237             : 
     238         247 :       return std::make_shared<ordering::OnDemandOrderingGate>(
     239         247 :           std::move(ordering_service),
     240         247 :           std::move(network_client),
     241         247 :           notifier.get_observable()
     242         247 :               .lift<iroha::synchronizer::SynchronizationEvent>(
     243         247 :                   iroha::makeDelay<iroha::synchronizer::SynchronizationEvent>(
     244         247 :                       delay_func, rxcpp::identity_current_thread()))
     245         247 :               .map(map),
     246         247 :           std::move(cache),
     247         247 :           std::move(proposal_factory),
     248         247 :           std::move(tx_cache),
     249             :           initial_round,
     250         247 :           ordering_log_manager->getChild("Gate")->getLogger());
     251           0 :     }
     252             : 
     253             :     auto OnDemandOrderingInit::createService(
     254             :         size_t max_size,
     255             :         std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
     256             :             proposal_factory,
     257             :         std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
     258             :         const logger::LoggerManagerTreePtr &ordering_log_manager) {
     259         247 :       return std::make_shared<ordering::OnDemandOrderingServiceImpl>(
     260             :           max_size,
     261         247 :           std::move(proposal_factory),
     262         247 :           std::move(tx_cache),
     263         247 :           ordering_log_manager->getChild("Service")->getLogger());
     264           0 :     }
     265             : 
     266             :     OnDemandOrderingInit::~OnDemandOrderingInit() {
     267         247 :       notifier.get_subscriber().unsubscribe();
     268         247 :     }
     269             : 
     270             :     std::shared_ptr<iroha::network::OrderingGate>
     271             :     OnDemandOrderingInit::initOrderingGate(
     272             :         size_t max_size,
     273             :         std::chrono::milliseconds delay,
     274             :         std::vector<shared_model::interface::types::HashType> initial_hashes,
     275             :         std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
     276             :         std::shared_ptr<
     277             :             ordering::transport::OnDemandOsServerGrpc::TransportFactoryType>
     278             :             transaction_factory,
     279             :         std::shared_ptr<shared_model::interface::TransactionBatchParser>
     280             :             batch_parser,
     281             :         std::shared_ptr<shared_model::interface::TransactionBatchFactory>
     282             :             transaction_batch_factory,
     283             :         std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
     284             :             async_call,
     285             :         std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
     286             :             proposal_factory,
     287             :         std::shared_ptr<TransportFactoryType> proposal_transport_factory,
     288             :         std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
     289             :         consensus::Round initial_round,
     290             :         std::function<std::chrono::milliseconds(
     291             :             const synchronizer::SynchronizationEvent &)> delay_func,
     292             :         logger::LoggerManagerTreePtr ordering_log_manager) {
     293         247 :       auto ordering_service = createService(
     294         247 :           max_size, proposal_factory, tx_cache, ordering_log_manager);
     295         247 :       service = std::make_shared<ordering::transport::OnDemandOsServerGrpc>(
     296             :           ordering_service,
     297         247 :           std::move(transaction_factory),
     298         247 :           std::move(batch_parser),
     299         247 :           std::move(transaction_batch_factory),
     300         247 :           ordering_log_manager->getChild("Server")->getLogger());
     301         247 :       return createGate(
     302         247 :           ordering_service,
     303         247 :           createConnectionManager(std::move(peer_query_factory),
     304         247 :                                   std::move(async_call),
     305         247 :                                   std::move(proposal_transport_factory),
     306         247 :                                   delay,
     307         247 :                                   std::move(initial_hashes),
     308             :                                   ordering_log_manager),
     309         247 :           std::make_shared<ordering::cache::OnDemandCache>(),
     310         247 :           std::move(proposal_factory),
     311         247 :           std::move(tx_cache),
     312         247 :           initial_round,
     313         247 :           std::move(delay_func),
     314             :           ordering_log_manager);
     315         247 :     }
     316             : 
     317             :   }  // namespace network
     318             : }  // namespace iroha

Generated by: LCOV version 1.13