LCOV - code coverage report
Current view: top level - irohad/multi_sig_transactions/impl - gossip_propagation_strategy.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 37 39 94.9 %
Date: 2019-03-07 14:46:43 Functions: 13 14 92.9 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "multi_sig_transactions/gossip_propagation_strategy.hpp"
       7             : 
       8             : #include <numeric>
       9             : #include <random>
      10             : 
      11             : #include <boost/assert.hpp>
      12             : #include <boost/range/irange.hpp>
      13             : #include "common/bind.hpp"
      14             : 
      15             : namespace iroha {
      16             : 
      17             :   using PropagationData = PropagationStrategy::PropagationData;
      18             :   using OptPeer = GossipPropagationStrategy::OptPeer;
      19             :   using PeerProviderFactory = GossipPropagationStrategy::PeerProviderFactory;
      20             :   using std::chrono::steady_clock;
      21             : 
      22             :   GossipPropagationStrategy::GossipPropagationStrategy(
      23             :       PeerProviderFactory peer_factory,
      24             :       rxcpp::observe_on_one_worker emit_worker,
      25             :       const GossipPropagationStrategyParams &params)
      26          12 :       : peer_factory(peer_factory),
      27          12 :         non_visited({}),
      28          12 :         emit_worker(emit_worker),
      29          12 :         emitent(rxcpp::observable<>::interval(steady_clock::now(),
      30          12 :                                               params.emission_period,
      31          12 :                                               emit_worker)
      32             :                     .map([this, params](int) {
      33         141 :                       PropagationData vec;
      34         148 :                       auto range = boost::irange(0u, params.amount_per_once);
      35             :                       // push until find empty element
      36         151 :                       std::find_if_not(
      37             :                           range.begin(), range.end(), [this, &vec](int) {
      38             :                             return this->visit() | [&vec](auto e) -> bool {
      39         250 :                               vec.push_back(e);
      40         250 :                               return true;  // proceed
      41             :                             };
      42           0 :                           });
      43         151 :                       return vec;
      44         151 :                     })) {}
      45             : 
      46             :   rxcpp::observable<PropagationData> GossipPropagationStrategy::emitter() {
      47          21 :     return emitent;
      48             :   }
      49             : 
      50             :   GossipPropagationStrategy::~GossipPropagationStrategy() {
      51             :     // Make sure that emitent callback have finish and haven't started yet
      52          12 :     std::lock_guard<std::mutex> lock(m);
      53          12 :     peer_factory.reset();
      54          12 :   }
      55             : 
      56             :   bool GossipPropagationStrategy::initQueue() {
      57             :     return peer_factory->createPeerQuery() | [](const auto &query) {
      58          63 :       return query->getLedgerPeers();
      59             :     } | [](auto &&data) -> boost::optional<PropagationData> {
      60          50 :       if (data.size() == 0) {
      61          13 :         return {};
      62             :       }
      63          37 :       return std::move(data);
      64             :     } | [this](auto &&data) -> bool {  // nullopt implicitly casts to false
      65          37 :       this->last_data = std::move(data);
      66          37 :       this->non_visited.resize(this->last_data.size());
      67          37 :       std::iota(this->non_visited.begin(), this->non_visited.end(), 0);
      68          37 :       std::shuffle(this->non_visited.begin(),
      69          37 :                    this->non_visited.end(),
      70          37 :                    std::default_random_engine{});
      71          37 :       return true;
      72             :     };
      73           0 :   }
      74             : 
      75             :   OptPeer GossipPropagationStrategy::visit() {
      76         276 :     std::lock_guard<std::mutex> lock(m);
      77         276 :     if (not peer_factory or (non_visited.empty() and not initQueue())) {
      78             :       // either PeerProvider doesn't gives peers / dtor have been called
      79          26 :       return {};
      80             :     }
      81             :     // or non_visited non-empty
      82         250 :     BOOST_ASSERT(not non_visited.empty());
      83         250 :     BOOST_ASSERT(non_visited.back() < last_data.size());
      84             : 
      85         250 :     auto el = last_data[non_visited.back()];
      86         250 :     non_visited.pop_back();
      87         250 :     return el;
      88         276 :   }
      89             : 
      90             : }  // namespace iroha

Generated by: LCOV version 1.13