Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/on_demand_connection_manager.hpp"
7 :
8 : #include <boost/range/combine.hpp>
9 : #include "interfaces/iroha_internal/proposal.hpp"
10 : #include "logger/logger.hpp"
11 : #include "ordering/impl/on_demand_common.hpp"
12 :
13 : using namespace iroha;
14 : using namespace iroha::ordering;
15 :
16 : OnDemandConnectionManager::OnDemandConnectionManager(
17 : std::shared_ptr<transport::OdOsNotificationFactory> factory,
18 : rxcpp::observable<CurrentPeers> peers,
19 : logger::LoggerPtr log)
20 251 : : log_(std::move(log)),
21 251 : factory_(std::move(factory)),
22 : subscription_(peers.subscribe([this](const auto &peers) {
23 : // exclusive lock
24 3373 : std::lock_guard<std::shared_timed_mutex> lock(mutex_);
25 :
26 3373 : this->initializeConnections(peers);
27 3373 : })) {}
28 :
29 : OnDemandConnectionManager::OnDemandConnectionManager(
30 : std::shared_ptr<transport::OdOsNotificationFactory> factory,
31 : rxcpp::observable<CurrentPeers> peers,
32 : CurrentPeers initial_peers,
33 : logger::LoggerPtr log)
34 4 : : OnDemandConnectionManager(std::move(factory), peers, std::move(log)) {
35 : // using start_with(initial_peers) results in deadlock
36 4 : initializeConnections(initial_peers);
37 4 : }
38 :
39 : OnDemandConnectionManager::~OnDemandConnectionManager() {
40 251 : subscription_.unsubscribe();
41 251 : }
42 :
43 : void OnDemandConnectionManager::onBatches(consensus::Round round,
44 : CollectionType batches) {
45 1117 : std::shared_lock<std::shared_timed_mutex> lock(mutex_);
46 :
47 : /*
48 : * Transactions are always sent to the round after the next round (+2)
49 : * There are 3 possibilities - next reject in the current round, first reject
50 : * in the next round, and first commit in the round after the next round
51 : * This can be visualised as a diagram, where:
52 : * o - current round, x - next round, v - target round
53 : *
54 : * 0 1 2
55 : * 0 o x v
56 : * 1 x v .
57 : * 2 v . .
58 : */
59 :
60 : auto propagate = [this, batches](PeerType type, consensus::Round round) {
61 3351 : log_->debug("onBatches, {}", round);
62 :
63 3351 : connections_.peers[type]->onBatches(round, batches);
64 3351 : };
65 :
66 1117 : propagate(
67 : kCurrentRoundRejectConsumer,
68 1117 : {round.block_round, currentRejectRoundConsumer(round.reject_round)});
69 1117 : propagate(kNextRoundRejectConsumer,
70 1117 : {round.block_round + 1, kNextRejectRoundConsumer});
71 1117 : propagate(kNextRoundCommitConsumer,
72 1117 : {round.block_round + 2, kNextCommitRoundConsumer});
73 1117 : }
74 :
75 : boost::optional<std::shared_ptr<const OnDemandConnectionManager::ProposalType>>
76 : OnDemandConnectionManager::onRequestProposal(consensus::Round round) {
77 3375 : std::shared_lock<std::shared_timed_mutex> lock(mutex_);
78 :
79 3375 : log_->debug("onRequestProposal, {}", round);
80 :
81 3375 : return connections_.peers[kIssuer]->onRequestProposal(round);
82 3375 : }
83 :
84 : void OnDemandConnectionManager::initializeConnections(
85 : const CurrentPeers &peers) {
86 : auto create_assign = [this](auto &ptr, auto &peer) {
87 13508 : ptr = factory_->create(*peer);
88 13508 : };
89 :
90 16885 : for (auto &&pair : boost::combine(connections_.peers, peers.peers)) {
91 13508 : create_assign(boost::get<0>(pair), boost::get<1>(pair));
92 : }
93 3377 : }
|