Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "main/impl/on_demand_ordering_init.hpp"
7 :
8 : #include "common/bind.hpp"
9 : #include "common/delay.hpp"
10 : #include "cryptography/crypto_provider/crypto_defaults.hpp"
11 : #include "datetime/time.hpp"
12 : #include "interfaces/common_objects/peer.hpp"
13 : #include "interfaces/common_objects/types.hpp"
14 : #include "logger/logger.hpp"
15 : #include "logger/logger_manager.hpp"
16 : #include "ordering/impl/on_demand_common.hpp"
17 : #include "ordering/impl/on_demand_connection_manager.hpp"
18 : #include "ordering/impl/on_demand_ordering_gate.hpp"
19 : #include "ordering/impl/on_demand_ordering_service_impl.hpp"
20 : #include "ordering/impl/on_demand_os_client_grpc.hpp"
21 : #include "ordering/impl/on_demand_os_server_grpc.hpp"
22 : #include "ordering/impl/ordering_gate_cache/on_demand_cache.hpp"
23 :
24 : namespace {
25 : /// match event and call corresponding lambda depending on sync_outcome
26 : template <typename OnBlocks, typename OnNothing>
27 : auto matchEvent(const iroha::synchronizer::SynchronizationEvent &event,
28 : OnBlocks &&on_blocks,
29 : OnNothing &&on_nothing) {
30 : using iroha::synchronizer::SynchronizationOutcomeType;
31 3373 : switch (event.sync_outcome) {
32 : case SynchronizationOutcomeType::kCommit:
33 964 : return std::forward<OnBlocks>(on_blocks)(event);
34 : case SynchronizationOutcomeType::kReject:
35 : case SynchronizationOutcomeType::kNothing:
36 2409 : return std::forward<OnNothing>(on_nothing)(event);
37 : default:
38 0 : BOOST_ASSERT_MSG(false, "Unknown value");
39 : }
40 3373 : }
41 : } // namespace
42 :
43 : namespace iroha {
44 : namespace network {
45 :
46 : OnDemandOrderingInit::OnDemandOrderingInit(logger::LoggerPtr log)
47 247 : : log_(std::move(log)) {}
48 :
49 : auto OnDemandOrderingInit::createNotificationFactory(
50 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
51 : async_call,
52 : std::shared_ptr<TransportFactoryType> proposal_transport_factory,
53 : std::chrono::milliseconds delay,
54 : const logger::LoggerManagerTreePtr &ordering_log_manager) {
55 247 : return std::make_shared<ordering::transport::OnDemandOsClientGrpcFactory>(
56 247 : std::move(async_call),
57 247 : std::move(proposal_transport_factory),
58 : [] { return std::chrono::system_clock::now(); },
59 : delay,
60 247 : ordering_log_manager->getChild("NetworkClient")->getLogger());
61 0 : }
62 :
63 : auto OnDemandOrderingInit::createConnectionManager(
64 : std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
65 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
66 : async_call,
67 : std::shared_ptr<TransportFactoryType> proposal_transport_factory,
68 : std::chrono::milliseconds delay,
69 : std::vector<shared_model::interface::types::HashType> initial_hashes,
70 : const logger::LoggerManagerTreePtr &ordering_log_manager) {
71 : // since top block will be the first in notifier observable, hashes of
72 : // two previous blocks are prepended
73 247 : const size_t kBeforePreviousTop = 0, kPreviousTop = 1;
74 :
75 : // flat map hashes from committed blocks
76 247 : auto all_hashes = notifier.get_observable()
77 : .flat_map([](auto commit) {
78 10119 : return commit.synced_blocks.map(
79 : [](auto block) { return block->hash(); });
80 0 : })
81 : // prepend hashes for the first two rounds
82 247 : .start_with(initial_hashes.at(kBeforePreviousTop),
83 247 : initial_hashes.at(kPreviousTop));
84 :
85 : // emit last k + 1 hashes, where k is the delay parameter
86 : // current implementation assumes k = 2
87 : // first hash is used for kCurrentRound
88 : // second hash is used for kNextRound
89 : // third hash is used for kRoundAfterNext
90 : auto latest_hashes =
91 247 : all_hashes.zip(all_hashes.skip(1), all_hashes.skip(2));
92 :
93 : auto map_peers = [this, peer_query_factory](auto &&latest_data)
94 : -> ordering::OnDemandConnectionManager::CurrentPeers {
95 3373 : auto &latest_commit = std::get<0>(latest_data);
96 3373 : auto ¤t_hashes = std::get<1>(latest_data);
97 :
98 3373 : consensus::Round current_round = latest_commit.round;
99 :
100 : auto on_blocks = [this,
101 3373 : peer_query_factory,
102 3373 : current_hashes,
103 : ¤t_round](const auto &commit) {
104 964 : current_round = ordering::nextCommitRound(current_round);
105 :
106 : // retrieve peer list from database
107 : // TODO lebdron 08.11.2018 IR-1853 Refactor PeerQuery without
108 : // database access and optional
109 : peer_query_factory->createPeerQuery() | [](auto &&query) {
110 888 : return query->getLedgerPeers();
111 : } | [this](auto &&peers) { current_peers_ = std::move(peers); };
112 :
113 : // generate permutation of peers list from corresponding round
114 : // hash
115 : auto generate_permutation = [&](auto round) {
116 964 : auto &hash = std::get<round()>(current_hashes);
117 964 : log_->debug("Using hash: {}", hash.toString());
118 964 : auto &permutation = permutations_[round()];
119 :
120 964 : std::seed_seq seed(hash.blob().begin(), hash.blob().end());
121 964 : gen_.seed(seed);
122 :
123 964 : permutation.resize(current_peers_.size());
124 964 : std::iota(permutation.begin(), permutation.end(), 0);
125 :
126 964 : std::shuffle(permutation.begin(), permutation.end(), gen_);
127 964 : };
128 :
129 964 : generate_permutation(RoundTypeConstant<kCurrentRound>{});
130 964 : generate_permutation(RoundTypeConstant<kNextRound>{});
131 964 : generate_permutation(RoundTypeConstant<kRoundAfterNext>{});
132 964 : };
133 : auto on_nothing = [¤t_round](const auto &) {
134 2409 : current_round = ordering::nextRejectRound(current_round);
135 2409 : };
136 :
137 3373 : matchEvent(latest_commit, on_blocks, on_nothing);
138 :
139 : auto getOsPeer = [this, ¤t_round](auto block_round_advance,
140 : auto reject_round) {
141 13492 : auto &permutation = permutations_[block_round_advance];
142 : // since reject round can be greater than number of peers, wrap it
143 : // with number of peers
144 13492 : auto &peer =
145 13492 : current_peers_[permutation[reject_round % permutation.size()]];
146 13492 : log_->debug(
147 13492 : "For {}, using OS on peer: {}",
148 13492 : consensus::Round{current_round.block_round + block_round_advance,
149 13492 : reject_round},
150 13492 : *peer);
151 13492 : return peer;
152 0 : };
153 :
154 : using ordering::OnDemandConnectionManager;
155 3373 : OnDemandConnectionManager::CurrentPeers peers;
156 : /*
157 : * See detailed description in
158 : * irohad/ordering/impl/on_demand_connection_manager.cpp
159 : *
160 : * 0 1 2
161 : * 0 o x v
162 : * 1 x v .
163 : * 2 v . .
164 : *
165 : * v, round 0 - kCurrentRoundRejectConsumer
166 : * v, round 1 - kNextRoundRejectConsumer
167 : * v, round 2 - kNextRoundCommitConsumer
168 : * o, round 0 - kIssuer
169 : */
170 3373 : peers.peers.at(OnDemandConnectionManager::kCurrentRoundRejectConsumer) =
171 3373 : getOsPeer(kCurrentRound,
172 3373 : ordering::currentRejectRoundConsumer(
173 3373 : current_round.reject_round));
174 3373 : peers.peers.at(OnDemandConnectionManager::kNextRoundRejectConsumer) =
175 3373 : getOsPeer(kNextRound, ordering::kNextRejectRoundConsumer);
176 3373 : peers.peers.at(OnDemandConnectionManager::kNextRoundCommitConsumer) =
177 3373 : getOsPeer(kRoundAfterNext, ordering::kNextCommitRoundConsumer);
178 3373 : peers.peers.at(OnDemandConnectionManager::kIssuer) =
179 3373 : getOsPeer(kCurrentRound, current_round.reject_round);
180 3373 : return peers;
181 3373 : };
182 :
183 247 : auto peers = notifier.get_observable()
184 247 : .with_latest_from(latest_hashes)
185 247 : .map(map_peers);
186 :
187 247 : return std::make_shared<ordering::OnDemandConnectionManager>(
188 247 : createNotificationFactory(std::move(async_call),
189 247 : std::move(proposal_transport_factory),
190 247 : delay,
191 247 : ordering_log_manager),
192 : peers,
193 247 : ordering_log_manager->getChild("ConnectionManager")->getLogger());
194 247 : }
195 :
196 : auto OnDemandOrderingInit::createGate(
197 : std::shared_ptr<ordering::OnDemandOrderingService> ordering_service,
198 : std::shared_ptr<ordering::transport::OdOsNotification> network_client,
199 : std::shared_ptr<ordering::cache::OrderingGateCache> cache,
200 : std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
201 : proposal_factory,
202 : std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
203 : consensus::Round initial_round,
204 : std::function<std::chrono::milliseconds(
205 : const synchronizer::SynchronizationEvent &)> delay_func,
206 : const logger::LoggerManagerTreePtr &ordering_log_manager) {
207 : auto map = [](auto commit) {
208 3373 : return matchEvent(
209 : commit,
210 : [](const auto &commit)
211 : -> ordering::OnDemandOrderingGate::BlockRoundEventType {
212 964 : ordering::cache::OrderingGateCache::HashesSetType hashes;
213 964 : commit.synced_blocks.as_blocking().subscribe(
214 : [&hashes](const auto &block) {
215 964 : const auto &committed = block->transactions();
216 964 : std::transform(committed.begin(),
217 964 : committed.end(),
218 964 : std::inserter(hashes, hashes.end()),
219 : [](const auto &transaction) {
220 914 : return transaction.hash();
221 : });
222 964 : const auto &rejected =
223 964 : block->rejected_transactions_hashes();
224 964 : std::copy(rejected.begin(),
225 964 : rejected.end(),
226 964 : std::inserter(hashes, hashes.end()));
227 964 : });
228 964 : return ordering::OnDemandOrderingGate::BlockEvent{
229 964 : ordering::nextCommitRound(commit.round), hashes};
230 964 : },
231 : [](const auto ¬hing)
232 : -> ordering::OnDemandOrderingGate::BlockRoundEventType {
233 2409 : return ordering::OnDemandOrderingGate::EmptyEvent{
234 2409 : ordering::nextRejectRound(nothing.round)};
235 : });
236 : };
237 :
238 247 : return std::make_shared<ordering::OnDemandOrderingGate>(
239 247 : std::move(ordering_service),
240 247 : std::move(network_client),
241 247 : notifier.get_observable()
242 247 : .lift<iroha::synchronizer::SynchronizationEvent>(
243 247 : iroha::makeDelay<iroha::synchronizer::SynchronizationEvent>(
244 247 : delay_func, rxcpp::identity_current_thread()))
245 247 : .map(map),
246 247 : std::move(cache),
247 247 : std::move(proposal_factory),
248 247 : std::move(tx_cache),
249 : initial_round,
250 247 : ordering_log_manager->getChild("Gate")->getLogger());
251 0 : }
252 :
253 : auto OnDemandOrderingInit::createService(
254 : size_t max_size,
255 : std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
256 : proposal_factory,
257 : std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
258 : const logger::LoggerManagerTreePtr &ordering_log_manager) {
259 247 : return std::make_shared<ordering::OnDemandOrderingServiceImpl>(
260 : max_size,
261 247 : std::move(proposal_factory),
262 247 : std::move(tx_cache),
263 247 : ordering_log_manager->getChild("Service")->getLogger());
264 0 : }
265 :
266 : OnDemandOrderingInit::~OnDemandOrderingInit() {
267 247 : notifier.get_subscriber().unsubscribe();
268 247 : }
269 :
270 : std::shared_ptr<iroha::network::OrderingGate>
271 : OnDemandOrderingInit::initOrderingGate(
272 : size_t max_size,
273 : std::chrono::milliseconds delay,
274 : std::vector<shared_model::interface::types::HashType> initial_hashes,
275 : std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
276 : std::shared_ptr<
277 : ordering::transport::OnDemandOsServerGrpc::TransportFactoryType>
278 : transaction_factory,
279 : std::shared_ptr<shared_model::interface::TransactionBatchParser>
280 : batch_parser,
281 : std::shared_ptr<shared_model::interface::TransactionBatchFactory>
282 : transaction_batch_factory,
283 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
284 : async_call,
285 : std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
286 : proposal_factory,
287 : std::shared_ptr<TransportFactoryType> proposal_transport_factory,
288 : std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
289 : consensus::Round initial_round,
290 : std::function<std::chrono::milliseconds(
291 : const synchronizer::SynchronizationEvent &)> delay_func,
292 : logger::LoggerManagerTreePtr ordering_log_manager) {
293 247 : auto ordering_service = createService(
294 247 : max_size, proposal_factory, tx_cache, ordering_log_manager);
295 247 : service = std::make_shared<ordering::transport::OnDemandOsServerGrpc>(
296 : ordering_service,
297 247 : std::move(transaction_factory),
298 247 : std::move(batch_parser),
299 247 : std::move(transaction_batch_factory),
300 247 : ordering_log_manager->getChild("Server")->getLogger());
301 247 : return createGate(
302 247 : ordering_service,
303 247 : createConnectionManager(std::move(peer_query_factory),
304 247 : std::move(async_call),
305 247 : std::move(proposal_transport_factory),
306 247 : delay,
307 247 : std::move(initial_hashes),
308 : ordering_log_manager),
309 247 : std::make_shared<ordering::cache::OnDemandCache>(),
310 247 : std::move(proposal_factory),
311 247 : std::move(tx_cache),
312 247 : initial_round,
313 247 : std::move(delay_func),
314 : ordering_log_manager);
315 247 : }
316 :
317 : } // namespace network
318 : } // namespace iroha
|