Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "synchronizer/impl/synchronizer_impl.hpp"
7 :
8 : #include <utility>
9 :
10 : #include "ametsuchi/block_query_factory.hpp"
11 : #include "ametsuchi/mutable_storage.hpp"
12 : #include "common/visitor.hpp"
13 : #include "interfaces/iroha_internal/block.hpp"
14 : #include "logger/logger.hpp"
15 :
16 : namespace iroha {
17 : namespace synchronizer {
18 :
19 : SynchronizerImpl::SynchronizerImpl(
20 : std::shared_ptr<network::ConsensusGate> consensus_gate,
21 : std::shared_ptr<validation::ChainValidator> validator,
22 : std::shared_ptr<ametsuchi::MutableFactory> mutable_factory,
23 : std::shared_ptr<ametsuchi::BlockQueryFactory> block_query_factory,
24 : std::shared_ptr<network::BlockLoader> block_loader,
25 : logger::LoggerPtr log)
26 260 : : validator_(std::move(validator)),
27 260 : mutable_factory_(std::move(mutable_factory)),
28 260 : block_query_factory_(std::move(block_query_factory)),
29 260 : block_loader_(std::move(block_loader)),
30 260 : log_(std::move(log)) {
31 260 : consensus_gate->onOutcome().subscribe(
32 : subscription_, [this](consensus::GateObject object) {
33 3143 : this->processOutcome(object);
34 3143 : });
35 260 : }
36 :
37 : void SynchronizerImpl::processOutcome(consensus::GateObject object) {
38 3143 : log_->info("processing consensus outcome");
39 3143 : visit_in_place(
40 : object,
41 : [this](const consensus::PairValid &msg) { this->processNext(msg); },
42 : [this](const consensus::VoteOther &msg) {
43 5 : this->processDifferent(msg);
44 5 : },
45 : [this](const consensus::ProposalReject &msg) {
46 : // TODO: nickaleks IR-147 18.01.19 add peers
47 : // list from GateObject when it has one
48 1 : notifier_.get_subscriber().on_next(SynchronizationEvent{
49 1 : rxcpp::observable<>::empty<
50 : std::shared_ptr<shared_model::interface::Block>>(),
51 : SynchronizationOutcomeType::kReject,
52 1 : msg.round});
53 1 : },
54 : [this](const consensus::BlockReject &msg) {
55 : // TODO: nickaleks IR-147 18.01.19 add peers
56 : // list from GateObject when it has one
57 1 : notifier_.get_subscriber().on_next(SynchronizationEvent{
58 1 : rxcpp::observable<>::empty<
59 : std::shared_ptr<shared_model::interface::Block>>(),
60 : SynchronizationOutcomeType::kReject,
61 1 : msg.round});
62 1 : },
63 : [this](const consensus::AgreementOnNone &msg) {
64 : // TODO: nickaleks IR-147 18.01.19 add peers
65 : // list from GateObject when it has one
66 2410 : notifier_.get_subscriber().on_next(SynchronizationEvent{
67 2410 : rxcpp::observable<>::empty<
68 : std::shared_ptr<shared_model::interface::Block>>(),
69 : SynchronizationOutcomeType::kNothing,
70 2410 : msg.round});
71 2410 : });
72 3143 : }
73 :
74 : boost::optional<SynchronizationEvent>
75 : SynchronizerImpl::downloadMissingBlocks(
76 : const consensus::VoteOther &msg,
77 : std::unique_ptr<ametsuchi::MutableStorage> storage,
78 : const shared_model::interface::types::HeightType height) {
79 5 : auto expected_height = msg.round.block_round;
80 :
81 : // while blocks are not loaded and not committed
82 5 : while (true) {
83 : // TODO andrei 17.10.18 IR-1763 Add delay strategy for loading blocks
84 15 : for (const auto &public_key : msg.public_keys) {
85 : auto network_chain =
86 10 : block_loader_->retrieveBlocks(height, public_key);
87 :
88 10 : std::vector<std::shared_ptr<shared_model::interface::Block>> blocks;
89 10 : network_chain.as_blocking().subscribe(
90 : [&blocks](auto block) { blocks.push_back(block); });
91 10 : if (blocks.empty()) {
92 1 : log_->info("Downloaded an empty chain");
93 1 : continue;
94 : } else {
95 9 : log_->info("Successfully downloaded {} blocks", blocks.size());
96 : }
97 :
98 : auto chain =
99 9 : rxcpp::observable<>::iterate(blocks, rxcpp::identity_immediate());
100 :
101 9 : if (blocks.back()->height() >= expected_height
102 9 : and validator_->validateAndApply(chain, *storage)) {
103 5 : auto ledger_state = mutable_factory_->commit(std::move(storage));
104 :
105 5 : if (ledger_state) {
106 3 : return SynchronizationEvent{chain,
107 : SynchronizationOutcomeType::kCommit,
108 3 : msg.round,
109 3 : std::move(*ledger_state)};
110 : } else {
111 2 : return boost::none;
112 : }
113 5 : }
114 14 : }
115 : }
116 5 : }
117 :
118 : boost::optional<std::unique_ptr<ametsuchi::MutableStorage>>
119 : SynchronizerImpl::getStorage() {
120 13 : auto mutable_storage_var = mutable_factory_->createMutableStorage();
121 13 : if (auto e =
122 13 : boost::get<expected::Error<std::string>>(&mutable_storage_var)) {
123 5 : log_->error("could not create mutable storage: {}", e->error);
124 5 : return {};
125 : }
126 8 : return {std::move(
127 8 : boost::get<
128 : expected::Value<std::unique_ptr<ametsuchi::MutableStorage>>>(
129 8 : &mutable_storage_var)
130 8 : ->value)};
131 13 : }
132 :
133 : void SynchronizerImpl::processNext(const consensus::PairValid &msg) {
134 726 : log_->info("at handleNext");
135 726 : auto ledger_state = mutable_factory_->commitPrepared(*msg.block);
136 726 : if (ledger_state) {
137 718 : notifier_.get_subscriber().on_next(
138 718 : SynchronizationEvent{rxcpp::observable<>::just(msg.block),
139 : SynchronizationOutcomeType::kCommit,
140 718 : msg.round,
141 718 : std::move(*ledger_state)});
142 718 : } else {
143 8 : auto opt_storage = getStorage();
144 8 : if (opt_storage == boost::none) {
145 5 : return;
146 : }
147 : std::unique_ptr<ametsuchi::MutableStorage> storage =
148 3 : std::move(opt_storage.value());
149 3 : if (storage->apply(*msg.block)) {
150 3 : ledger_state = mutable_factory_->commit(std::move(storage));
151 3 : if (ledger_state) {
152 1 : notifier_.get_subscriber().on_next(
153 1 : SynchronizationEvent{rxcpp::observable<>::just(msg.block),
154 : SynchronizationOutcomeType::kCommit,
155 1 : msg.round,
156 1 : std::move(*ledger_state)});
157 1 : } else {
158 2 : log_->error("failed to commit mutable storage");
159 : }
160 3 : } else {
161 0 : log_->warn("Block was not committed due to fail in mutable storage");
162 : }
163 11 : }
164 726 : }
165 :
166 : void SynchronizerImpl::processDifferent(const consensus::VoteOther &msg) {
167 5 : log_->info("at handleDifferent");
168 :
169 5 : shared_model::interface::types::HeightType top_block_height{0};
170 5 : if (auto block_query = block_query_factory_->createBlockQuery()) {
171 5 : top_block_height = (*block_query)->getTopBlockHeight();
172 5 : } else {
173 0 : log_->error(
174 0 : "Unable to create block query and retrieve top block height");
175 0 : return;
176 : }
177 :
178 5 : if (top_block_height >= msg.round.block_round) {
179 0 : log_->info(
180 0 : "Storage is already in synchronized state. Top block height is {}",
181 : top_block_height);
182 0 : return;
183 : }
184 :
185 5 : auto opt_storage = getStorage();
186 5 : if (opt_storage == boost::none) {
187 0 : return;
188 : }
189 : std::unique_ptr<ametsuchi::MutableStorage> storage =
190 5 : std::move(opt_storage.value());
191 : auto result =
192 5 : downloadMissingBlocks(msg, std::move(storage), top_block_height);
193 5 : if (result) {
194 3 : notifier_.get_subscriber().on_next(*result);
195 3 : }
196 5 : }
197 :
198 : rxcpp::observable<SynchronizationEvent>
199 : SynchronizerImpl::on_commit_chain() {
200 1001 : return notifier_.get_observable();
201 0 : }
202 :
203 : SynchronizerImpl::~SynchronizerImpl() {
204 260 : subscription_.unsubscribe();
205 260 : }
206 :
207 : } // namespace synchronizer
208 : } // namespace iroha
|