Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "consensus/yac/yac.hpp"
7 :
8 : #include <utility>
9 :
10 : #include "common/bind.hpp"
11 : #include "common/visitor.hpp"
12 : #include "consensus/yac/cluster_order.hpp"
13 : #include "consensus/yac/storage/yac_proposal_storage.hpp"
14 : #include "consensus/yac/timer.hpp"
15 : #include "consensus/yac/yac_crypto_provider.hpp"
16 : #include "cryptography/public_key.hpp"
17 : #include "cryptography/signed.hpp"
18 : #include "interfaces/common_objects/peer.hpp"
19 : #include "logger/logger.hpp"
20 :
21 : namespace iroha {
22 : namespace consensus {
23 : namespace yac {
24 :
25 : template <typename T>
26 : static std::string cryptoError(const T &votes) {
27 4 : std::string result =
28 4 : "Crypto verification failed for message.\n Votes: ";
29 : result += logger::to_string(votes, [](const auto &vote) {
30 4 : std::string result = "(Public key: ";
31 4 : result += vote.signature->publicKey().hex();
32 4 : result += ", Signature: ";
33 4 : result += vote.signature->signedData().hex();
34 4 : result += ")\n";
35 4 : return result;
36 4 : });
37 4 : return result;
38 4 : }
39 :
40 : std::shared_ptr<Yac> Yac::create(
41 : YacVoteStorage vote_storage,
42 : std::shared_ptr<YacNetwork> network,
43 : std::shared_ptr<YacCryptoProvider> crypto,
44 : std::shared_ptr<Timer> timer,
45 : ClusterOrdering order,
46 : logger::LoggerPtr log) {
47 273 : return std::make_shared<Yac>(
48 273 : vote_storage, network, crypto, timer, order, std::move(log));
49 : }
50 :
51 : Yac::Yac(YacVoteStorage vote_storage,
52 : std::shared_ptr<YacNetwork> network,
53 : std::shared_ptr<YacCryptoProvider> crypto,
54 : std::shared_ptr<Timer> timer,
55 : ClusterOrdering order,
56 : logger::LoggerPtr log)
57 273 : : vote_storage_(std::move(vote_storage)),
58 273 : network_(std::move(network)),
59 273 : crypto_(std::move(crypto)),
60 273 : timer_(std::move(timer)),
61 273 : cluster_order_(order),
62 273 : log_(std::move(log)) {}
63 :
64 : // ------|Hash gate|------
65 :
66 : void Yac::vote(YacHash hash, ClusterOrdering order) {
67 3140 : log_->info("Order for voting: {}",
68 3140 : logger::to_string(order.getPeers(),
69 : [](auto val) { return val->address(); }));
70 :
71 3140 : cluster_order_ = order;
72 3140 : auto vote = crypto_->getVote(hash);
73 : // TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
74 : // separate entity
75 3140 : votingStep(vote);
76 3140 : }
77 :
78 : rxcpp::observable<Answer> Yac::onOutcome() {
79 504 : return notifier_.get_observable();
80 0 : }
81 :
82 : // ------|Network notifications|------
83 :
84 : void Yac::onState(std::vector<VoteMessage> state) {
85 6738 : std::lock_guard<std::mutex> guard(mutex_);
86 6739 : if (crypto_->verify(state)) {
87 6108 : applyState(state);
88 6735 : } else {
89 4 : log_->warn("{}", cryptoError(state));
90 : }
91 6739 : }
92 :
93 : // ------|Private interface|------
94 :
95 : void Yac::votingStep(VoteMessage vote) {
96 3588 : auto committed = vote_storage_.isCommitted(vote.hash.vote_round);
97 3588 : if (committed) {
98 1 : return;
99 : }
100 :
101 3587 : const auto ¤t_leader = cluster_order_.currentLeader();
102 :
103 3587 : log_->info("Vote for round {}, hash ({}, {}) to peer {}",
104 3587 : vote.hash.vote_round,
105 3587 : vote.hash.vote_hashes.proposal_hash,
106 3587 : vote.hash.vote_hashes.block_hash,
107 3587 : current_leader);
108 :
109 3587 : network_->sendState(current_leader, {vote});
110 3587 : cluster_order_.switchToNext();
111 3587 : if (cluster_order_.hasNext()) {
112 : timer_->invokeAfterDelay([this, vote] { this->votingStep(vote); });
113 448 : }
114 3588 : }
115 :
116 : void Yac::closeRound() {
117 3140 : timer_->deny();
118 3140 : }
119 :
120 : boost::optional<std::shared_ptr<shared_model::interface::Peer>>
121 : Yac::findPeer(const VoteMessage &vote) {
122 3 : auto peers = cluster_order_.getPeers();
123 : auto it =
124 : std::find_if(peers.begin(), peers.end(), [&](const auto &peer) {
125 17 : return peer->pubkey() == vote.signature->publicKey();
126 : });
127 3 : return it != peers.end() ? boost::make_optional(std::move(*it))
128 2 : : boost::none;
129 3 : }
130 :
131 : // ------|Apply data|------
132 :
133 : void Yac::applyState(const std::vector<VoteMessage> &state) {
134 : auto answer =
135 6735 : vote_storage_.store(state, cluster_order_.getNumberOfPeers());
136 :
137 : // TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
138 : // separate entity
139 :
140 : answer | [&](const auto &answer) {
141 6735 : auto &proposal_round = state.at(0).hash.vote_round;
142 :
143 : /*
144 : * It is possible that a new peer with an outdated peers list may
145 : * collect an outcome from a smaller number of peers which are
146 : * included in set of `f` peers in the system. The new peer will not
147 : * accept our message with valid supermajority because he cannot apply
148 : * votes from unknown peers.
149 : */
150 6281 : if (state.size() > 1) {
151 : // some peer has already collected commit/reject, so it is sent
152 439 : if (vote_storage_.getProcessingState(proposal_round)
153 439 : == ProposalState::kNotSentNotProcessed) {
154 6 : vote_storage_.nextProcessingState(proposal_round);
155 6 : log_->info(
156 6 : "Received supermajority of votes for {}, skip propagation",
157 6 : proposal_round);
158 6 : }
159 439 : }
160 :
161 6283 : auto processing_state =
162 6283 : vote_storage_.getProcessingState(proposal_round);
163 :
164 : auto votes = [](const auto &state) { return state.votes; };
165 :
166 6283 : switch (processing_state) {
167 : case ProposalState::kNotSentNotProcessed:
168 3136 : vote_storage_.nextProcessingState(proposal_round);
169 3136 : log_->info("Propagate state {} to whole network", proposal_round);
170 3136 : this->propagateState(visit_in_place(answer, votes));
171 3136 : break;
172 : case ProposalState::kSentNotProcessed:
173 3140 : vote_storage_.nextProcessingState(proposal_round);
174 3140 : log_->info("Pass outcome for {} to pipeline", proposal_round);
175 3140 : this->closeRound();
176 3140 : notifier_.get_subscriber().on_next(answer);
177 3140 : break;
178 : case ProposalState::kSentProcessed:
179 5 : if (state.size() == 1) {
180 : this->findPeer(state.at(0)) | [&](const auto &from) {
181 1 : log_->info("Propagate state {} directly to {}",
182 3 : proposal_round,
183 1 : from->address());
184 1 : this->propagateStateDirectly(*from,
185 3 : visit_in_place(answer, votes));
186 1 : };
187 3 : }
188 5 : break;
189 : }
190 6281 : };
191 6735 : }
192 :
193 : // ------|Propagation|------
194 :
195 : void Yac::propagateState(const std::vector<VoteMessage> &msg) {
196 6719 : for (const auto &peer : cluster_order_.getPeers()) {
197 3583 : propagateStateDirectly(*peer, msg);
198 : }
199 3136 : }
200 :
201 : void Yac::propagateStateDirectly(const shared_model::interface::Peer &to,
202 : const std::vector<VoteMessage> &msg) {
203 3584 : network_->sendState(to, msg);
204 3584 : }
205 :
206 : } // namespace yac
207 : } // namespace consensus
208 : } // namespace iroha
|