LCOV - code coverage report
Current view: top level - irohad/ordering/impl - on_demand_connection_manager.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 30 30 100.0 %
Date: 2019-03-07 14:46:43 Functions: 13 15 86.7 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "ordering/impl/on_demand_connection_manager.hpp"
       7             : 
       8             : #include <boost/range/combine.hpp>
       9             : #include "interfaces/iroha_internal/proposal.hpp"
      10             : #include "logger/logger.hpp"
      11             : #include "ordering/impl/on_demand_common.hpp"
      12             : 
      13             : using namespace iroha;
      14             : using namespace iroha::ordering;
      15             : 
      16             : OnDemandConnectionManager::OnDemandConnectionManager(
      17             :     std::shared_ptr<transport::OdOsNotificationFactory> factory,
      18             :     rxcpp::observable<CurrentPeers> peers,
      19             :     logger::LoggerPtr log)
      20         251 :     : log_(std::move(log)),
      21         251 :       factory_(std::move(factory)),
      22             :       subscription_(peers.subscribe([this](const auto &peers) {
      23             :         // exclusive lock
      24        3373 :         std::lock_guard<std::shared_timed_mutex> lock(mutex_);
      25             : 
      26        3373 :         this->initializeConnections(peers);
      27        3373 :       })) {}
      28             : 
      29             : OnDemandConnectionManager::OnDemandConnectionManager(
      30             :     std::shared_ptr<transport::OdOsNotificationFactory> factory,
      31             :     rxcpp::observable<CurrentPeers> peers,
      32             :     CurrentPeers initial_peers,
      33             :     logger::LoggerPtr log)
      34           4 :     : OnDemandConnectionManager(std::move(factory), peers, std::move(log)) {
      35             :   // using start_with(initial_peers) results in deadlock
      36           4 :   initializeConnections(initial_peers);
      37           4 : }
      38             : 
      39             : OnDemandConnectionManager::~OnDemandConnectionManager() {
      40         251 :   subscription_.unsubscribe();
      41         251 : }
      42             : 
      43             : void OnDemandConnectionManager::onBatches(consensus::Round round,
      44             :                                           CollectionType batches) {
      45        1117 :   std::shared_lock<std::shared_timed_mutex> lock(mutex_);
      46             : 
      47             :   /*
      48             :    * Transactions are always sent to the round after the next round (+2)
      49             :    * There are 3 possibilities - next reject in the current round, first reject
      50             :    * in the next round, and first commit in the round after the next round
      51             :    * This can be visualised as a diagram, where:
      52             :    * o - current round, x - next round, v - target round
      53             :    *
      54             :    *   0 1 2
      55             :    * 0 o x v
      56             :    * 1 x v .
      57             :    * 2 v . .
      58             :    */
      59             : 
      60             :   auto propagate = [this, batches](PeerType type, consensus::Round round) {
      61        3351 :     log_->debug("onBatches, {}", round);
      62             : 
      63        3351 :     connections_.peers[type]->onBatches(round, batches);
      64        3351 :   };
      65             : 
      66        1117 :   propagate(
      67             :       kCurrentRoundRejectConsumer,
      68        1117 :       {round.block_round, currentRejectRoundConsumer(round.reject_round)});
      69        1117 :   propagate(kNextRoundRejectConsumer,
      70        1117 :             {round.block_round + 1, kNextRejectRoundConsumer});
      71        1117 :   propagate(kNextRoundCommitConsumer,
      72        1117 :             {round.block_round + 2, kNextCommitRoundConsumer});
      73        1117 : }
      74             : 
      75             : boost::optional<std::shared_ptr<const OnDemandConnectionManager::ProposalType>>
      76             : OnDemandConnectionManager::onRequestProposal(consensus::Round round) {
      77        3375 :   std::shared_lock<std::shared_timed_mutex> lock(mutex_);
      78             : 
      79        3375 :   log_->debug("onRequestProposal, {}", round);
      80             : 
      81        3375 :   return connections_.peers[kIssuer]->onRequestProposal(round);
      82        3375 : }
      83             : 
      84             : void OnDemandConnectionManager::initializeConnections(
      85             :     const CurrentPeers &peers) {
      86             :   auto create_assign = [this](auto &ptr, auto &peer) {
      87       13508 :     ptr = factory_->create(*peer);
      88       13508 :   };
      89             : 
      90       16885 :   for (auto &&pair : boost::combine(connections_.peers, peers.peers)) {
      91       13508 :     create_assign(boost::get<0>(pair), boost::get<1>(pair));
      92             :   }
      93        3377 : }

Generated by: LCOV version 1.13