Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "consensus/yac/impl/yac_gate_impl.hpp"
7 :
8 : #include <boost/range/adaptor/transformed.hpp>
9 : #include "common/visitor.hpp"
10 : #include "consensus/yac/cluster_order.hpp"
11 : #include "consensus/yac/outcome_messages.hpp"
12 : #include "consensus/yac/storage/yac_common.hpp"
13 : #include "consensus/yac/yac_hash_provider.hpp"
14 : #include "consensus/yac/yac_peer_orderer.hpp"
15 : #include "cryptography/public_key.hpp"
16 : #include "interfaces/common_objects/signature.hpp"
17 : #include "interfaces/iroha_internal/block.hpp"
18 : #include "logger/logger.hpp"
19 : #include "simulator/block_creator.hpp"
20 :
21 : namespace iroha {
22 : namespace consensus {
23 : namespace yac {
24 :
25 : YacGateImpl::YacGateImpl(
26 : std::shared_ptr<HashGate> hash_gate,
27 : std::shared_ptr<YacPeerOrderer> orderer,
28 : std::shared_ptr<YacHashProvider> hash_provider,
29 : std::shared_ptr<simulator::BlockCreator> block_creator,
30 : std::shared_ptr<consensus::ConsensusResultCache>
31 : consensus_result_cache,
32 : logger::LoggerPtr log)
33 254 : : hash_gate_(std::move(hash_gate)),
34 254 : orderer_(std::move(orderer)),
35 254 : hash_provider_(std::move(hash_provider)),
36 254 : block_creator_(std::move(block_creator)),
37 254 : consensus_result_cache_(std::move(consensus_result_cache)),
38 254 : log_(std::move(log)),
39 254 : current_hash_() {
40 254 : block_creator_->onBlock().subscribe(
41 : [this](const auto &event) { this->vote(event); });
42 254 : }
43 :
44 : void YacGateImpl::vote(const simulator::BlockCreatorEvent &event) {
45 3381 : if (current_hash_.vote_round >= event.round) {
46 1 : log_->info(
47 1 : "Current round {} is greater than or equal to vote round {}, "
48 : "skipped",
49 1 : current_hash_.vote_round,
50 1 : event.round);
51 1 : return;
52 : }
53 :
54 3380 : current_hash_ = hash_provider_->makeHash(event);
55 :
56 3380 : if (not event.round_data) {
57 2653 : current_block_ = boost::none;
58 2653 : log_->debug("Agreed on nothing to commit");
59 2653 : } else {
60 727 : current_block_ = event.round_data->block;
61 : // insert the block we voted for to the consensus cache
62 727 : consensus_result_cache_->insert(event.round_data->block);
63 727 : log_->info("vote for (proposal: {}, block: {})",
64 727 : current_hash_.vote_hashes.proposal_hash,
65 727 : current_hash_.vote_hashes.block_hash);
66 : }
67 :
68 3380 : auto order = orderer_->getOrdering(current_hash_);
69 3380 : if (not order) {
70 242 : log_->error("ordering doesn't provide peers => pass round");
71 242 : return;
72 : }
73 :
74 3138 : hash_gate_->vote(current_hash_, *order);
75 3381 : }
76 :
77 : rxcpp::observable<YacGateImpl::GateObject> YacGateImpl::onOutcome() {
78 : return hash_gate_->onOutcome().flat_map([this](auto message) {
79 6264 : return visit_in_place(message,
80 : [this](const CommitMessage &msg) {
81 6263 : return this->handleCommit(msg);
82 0 : },
83 : [this](const RejectMessage &msg) {
84 1 : return this->handleReject(msg);
85 0 : });
86 0 : });
87 0 : }
88 :
89 : void YacGateImpl::copySignatures(const CommitMessage &commit) {
90 2890 : for (const auto &vote : commit.votes) {
91 1447 : auto sig = vote.hash.block_signature;
92 1447 : current_block_.value()->addSignature(sig->signedData(),
93 1447 : sig->publicKey());
94 1447 : }
95 1443 : }
96 :
97 : rxcpp::observable<YacGateImpl::GateObject> YacGateImpl::handleCommit(
98 : const CommitMessage &msg) {
99 6263 : const auto hash = getHash(msg.votes).value();
100 6263 : if (hash.vote_round < current_hash_.vote_round) {
101 1 : log_->info(
102 1 : "Current round {} is greater than commit round {}, skipped",
103 1 : current_hash_.vote_round,
104 1 : hash.vote_round);
105 1 : return rxcpp::observable<>::empty<GateObject>();
106 : }
107 :
108 6262 : if (hash == current_hash_ and current_block_) {
109 : // if node has voted for the committed block
110 : // append signatures of other nodes
111 1443 : this->copySignatures(msg);
112 1443 : auto &block = current_block_.value();
113 1443 : log_->info("consensus: commit top block: height {}, hash {}",
114 1443 : block->height(),
115 1443 : block->hash().hex());
116 1443 : return rxcpp::observable<>::just<GateObject>(
117 1443 : PairValid{block, current_hash_.vote_round});
118 : }
119 :
120 4819 : current_hash_ = hash;
121 :
122 4819 : if (hash.vote_hashes.proposal_hash.empty()) {
123 : // if consensus agreed on nothing for commit
124 4818 : log_->info("Consensus skipped round, voted for nothing");
125 4818 : current_block_ = boost::none;
126 4818 : return rxcpp::observable<>::just<GateObject>(
127 4818 : AgreementOnNone{current_hash_.vote_round});
128 : }
129 :
130 1 : log_->info("Voted for another block, waiting for sync");
131 1 : current_block_ = boost::none;
132 1 : auto public_keys = boost::copy_range<
133 : shared_model::interface::types::PublicKeyCollectionType>(
134 : msg.votes | boost::adaptors::transformed([](auto &vote) {
135 1 : return vote.signature->publicKey();
136 : }));
137 1 : auto model_hash = hash_provider_->toModelHash(hash);
138 1 : return rxcpp::observable<>::just<GateObject>(
139 1 : VoteOther{std::move(public_keys),
140 1 : std::move(model_hash),
141 1 : current_hash_.vote_round});
142 6263 : }
143 :
144 : rxcpp::observable<YacGateImpl::GateObject> YacGateImpl::handleReject(
145 : const RejectMessage &msg) {
146 1 : const auto hash = getHash(msg.votes).value();
147 1 : if (hash.vote_round < current_hash_.vote_round) {
148 1 : log_->info(
149 1 : "Current round {} is greater than reject round {}, skipped",
150 1 : current_hash_.vote_round,
151 1 : hash.vote_round);
152 1 : return rxcpp::observable<>::empty<GateObject>();
153 : }
154 :
155 0 : auto has_same_proposals =
156 0 : std::all_of(std::next(msg.votes.begin()),
157 0 : msg.votes.end(),
158 : [first = msg.votes.begin()](const auto ¤t) {
159 0 : return first->hash.vote_hashes.proposal_hash
160 0 : == current.hash.vote_hashes.proposal_hash;
161 : });
162 0 : if (not has_same_proposals) {
163 0 : log_->info("Proposal reject since all hashes are different");
164 0 : return rxcpp::observable<>::just<GateObject>(
165 0 : ProposalReject{current_hash_.vote_round});
166 : }
167 0 : log_->info("Block reject since proposal hashes match");
168 0 : return rxcpp::observable<>::just<GateObject>(
169 0 : BlockReject{current_hash_.vote_round});
170 1 : }
171 : } // namespace yac
172 : } // namespace consensus
173 : } // namespace iroha
|