Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "multi_sig_transactions/gossip_propagation_strategy.hpp"
7 :
8 : #include <numeric>
9 : #include <random>
10 :
11 : #include <boost/assert.hpp>
12 : #include <boost/range/irange.hpp>
13 : #include "common/bind.hpp"
14 :
15 : namespace iroha {
16 :
17 : using PropagationData = PropagationStrategy::PropagationData;
18 : using OptPeer = GossipPropagationStrategy::OptPeer;
19 : using PeerProviderFactory = GossipPropagationStrategy::PeerProviderFactory;
20 : using std::chrono::steady_clock;
21 :
22 : GossipPropagationStrategy::GossipPropagationStrategy(
23 : PeerProviderFactory peer_factory,
24 : rxcpp::observe_on_one_worker emit_worker,
25 : const GossipPropagationStrategyParams ¶ms)
26 12 : : peer_factory(peer_factory),
27 12 : non_visited({}),
28 12 : emit_worker(emit_worker),
29 12 : emitent(rxcpp::observable<>::interval(steady_clock::now(),
30 12 : params.emission_period,
31 12 : emit_worker)
32 : .map([this, params](int) {
33 141 : PropagationData vec;
34 148 : auto range = boost::irange(0u, params.amount_per_once);
35 : // push until find empty element
36 151 : std::find_if_not(
37 : range.begin(), range.end(), [this, &vec](int) {
38 : return this->visit() | [&vec](auto e) -> bool {
39 250 : vec.push_back(e);
40 250 : return true; // proceed
41 : };
42 0 : });
43 151 : return vec;
44 151 : })) {}
45 :
46 : rxcpp::observable<PropagationData> GossipPropagationStrategy::emitter() {
47 21 : return emitent;
48 : }
49 :
50 : GossipPropagationStrategy::~GossipPropagationStrategy() {
51 : // Make sure that emitent callback have finish and haven't started yet
52 12 : std::lock_guard<std::mutex> lock(m);
53 12 : peer_factory.reset();
54 12 : }
55 :
56 : bool GossipPropagationStrategy::initQueue() {
57 : return peer_factory->createPeerQuery() | [](const auto &query) {
58 63 : return query->getLedgerPeers();
59 : } | [](auto &&data) -> boost::optional<PropagationData> {
60 50 : if (data.size() == 0) {
61 13 : return {};
62 : }
63 37 : return std::move(data);
64 : } | [this](auto &&data) -> bool { // nullopt implicitly casts to false
65 37 : this->last_data = std::move(data);
66 37 : this->non_visited.resize(this->last_data.size());
67 37 : std::iota(this->non_visited.begin(), this->non_visited.end(), 0);
68 37 : std::shuffle(this->non_visited.begin(),
69 37 : this->non_visited.end(),
70 37 : std::default_random_engine{});
71 37 : return true;
72 : };
73 0 : }
74 :
75 : OptPeer GossipPropagationStrategy::visit() {
76 276 : std::lock_guard<std::mutex> lock(m);
77 276 : if (not peer_factory or (non_visited.empty() and not initQueue())) {
78 : // either PeerProvider doesn't gives peers / dtor have been called
79 26 : return {};
80 : }
81 : // or non_visited non-empty
82 250 : BOOST_ASSERT(not non_visited.empty());
83 250 : BOOST_ASSERT(non_visited.back() < last_data.size());
84 :
85 250 : auto el = last_data[non_visited.back()];
86 250 : non_visited.pop_back();
87 250 : return el;
88 276 : }
89 :
90 : } // namespace iroha
|