Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "main/application.hpp"
7 :
8 : #include "ametsuchi/impl/storage_impl.hpp"
9 : #include "ametsuchi/impl/tx_presence_cache_impl.hpp"
10 : #include "ametsuchi/impl/wsv_restorer_impl.hpp"
11 : #include "backend/protobuf/common_objects/proto_common_objects_factory.hpp"
12 : #include "backend/protobuf/proto_block_json_converter.hpp"
13 : #include "backend/protobuf/proto_permission_to_string.hpp"
14 : #include "backend/protobuf/proto_proposal_factory.hpp"
15 : #include "backend/protobuf/proto_query_response_factory.hpp"
16 : #include "backend/protobuf/proto_transport_factory.hpp"
17 : #include "backend/protobuf/proto_tx_status_factory.hpp"
18 : #include "common/bind.hpp"
19 : #include "consensus/yac/consistency_model.hpp"
20 : #include "cryptography/crypto_provider/crypto_model_signer.hpp"
21 : #include "interfaces/iroha_internal/transaction_batch_factory_impl.hpp"
22 : #include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp"
23 : #include "logger/logger.hpp"
24 : #include "logger/logger_manager.hpp"
25 : #include "main/server_runner.hpp"
26 : #include "multi_sig_transactions/gossip_propagation_strategy.hpp"
27 : #include "multi_sig_transactions/mst_processor_impl.hpp"
28 : #include "multi_sig_transactions/mst_propagation_strategy_stub.hpp"
29 : #include "multi_sig_transactions/mst_time_provider_impl.hpp"
30 : #include "multi_sig_transactions/storage/mst_storage_impl.hpp"
31 : #include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
32 : #include "multi_sig_transactions/transport/mst_transport_stub.hpp"
33 : #include "network/impl/block_loader_impl.hpp"
34 : #include "network/impl/peer_communication_service_impl.hpp"
35 : #include "ordering/impl/on_demand_common.hpp"
36 : #include "ordering/impl/on_demand_ordering_gate.hpp"
37 : #include "pending_txs_storage/impl/pending_txs_storage_impl.hpp"
38 : #include "simulator/impl/simulator.hpp"
39 : #include "synchronizer/impl/synchronizer_impl.hpp"
40 : #include "torii/impl/command_service_impl.hpp"
41 : #include "torii/impl/command_service_transport_grpc.hpp"
42 : #include "torii/impl/status_bus_impl.hpp"
43 : #include "torii/processor/query_processor_impl.hpp"
44 : #include "torii/processor/transaction_processor_impl.hpp"
45 : #include "torii/query_service.hpp"
46 : #include "validation/impl/chain_validator_impl.hpp"
47 : #include "validation/impl/stateful_validator_impl.hpp"
48 : #include "validators/default_validator.hpp"
49 : #include "validators/field_validator.hpp"
50 : #include "validators/protobuf/proto_block_validator.hpp"
51 : #include "validators/protobuf/proto_proposal_validator.hpp"
52 : #include "validators/protobuf/proto_query_validator.hpp"
53 : #include "validators/protobuf/proto_transaction_validator.hpp"
54 :
55 : using namespace iroha;
56 : using namespace iroha::ametsuchi;
57 : using namespace iroha::simulator;
58 : using namespace iroha::validation;
59 : using namespace iroha::network;
60 : using namespace iroha::synchronizer;
61 : using namespace iroha::torii;
62 : using namespace iroha::consensus::yac;
63 :
64 : using namespace std::chrono_literals;
65 :
66 : /// Consensus consistency model type.
67 : static constexpr iroha::consensus::yac::ConsistencyModel
68 : kConsensusConsistencyModel = iroha::consensus::yac::ConsistencyModel::kBft;
69 :
70 : /**
71 : * Configuring iroha daemon
72 : */
73 : Irohad::Irohad(const std::string &block_store_dir,
74 : const std::string &pg_conn,
75 : const std::string &listen_ip,
76 : size_t torii_port,
77 : size_t internal_port,
78 : size_t max_proposal_size,
79 : std::chrono::milliseconds proposal_delay,
80 : std::chrono::milliseconds vote_delay,
81 : std::chrono::minutes mst_expiration_time,
82 : const shared_model::crypto::Keypair &keypair,
83 : std::chrono::milliseconds max_rounds_delay,
84 : size_t stale_stream_max_rounds,
85 : logger::LoggerManagerTreePtr logger_manager,
86 : const boost::optional<GossipPropagationStrategyParams>
87 : &opt_mst_gossip_params)
88 247 : : block_store_dir_(block_store_dir),
89 247 : pg_conn_(pg_conn),
90 247 : listen_ip_(listen_ip),
91 247 : torii_port_(torii_port),
92 247 : internal_port_(internal_port),
93 247 : max_proposal_size_(max_proposal_size),
94 247 : proposal_delay_(proposal_delay),
95 247 : vote_delay_(vote_delay),
96 247 : is_mst_supported_(opt_mst_gossip_params),
97 247 : mst_expiration_time_(mst_expiration_time),
98 247 : max_rounds_delay_(max_rounds_delay),
99 247 : stale_stream_max_rounds_(stale_stream_max_rounds),
100 247 : opt_mst_gossip_params_(opt_mst_gossip_params),
101 247 : keypair(keypair),
102 247 : ordering_init(logger_manager->getLogger()),
103 247 : log_manager_(std::move(logger_manager)),
104 247 : log_(log_manager_->getLogger()) {
105 247 : log_->info("created");
106 : // Initializing storage at this point in order to insert genesis block before
107 : // initialization of iroha daemon
108 247 : initStorage();
109 247 : }
110 :
111 : Irohad::~Irohad() {
112 247 : consensus_gate_events_subscription.unsubscribe();
113 247 : }
114 :
115 : /**
116 : * Initializing iroha daemon
117 : */
118 : void Irohad::init() {
119 : // Recover WSV from the existing ledger to be sure it is consistent
120 247 : initWsvRestorer();
121 247 : restoreWsv();
122 :
123 247 : initCryptoProvider();
124 247 : initBatchParser();
125 247 : initValidators();
126 247 : initNetworkClient();
127 247 : initFactories();
128 247 : initPersistentCache();
129 247 : initOrderingGate();
130 247 : initSimulator();
131 247 : initConsensusCache();
132 247 : initBlockLoader();
133 247 : initConsensusGate();
134 247 : initSynchronizer();
135 247 : initPeerCommunicationService();
136 247 : initStatusBus();
137 247 : initMstProcessor();
138 247 : initPendingTxsStorage();
139 :
140 : // Torii
141 247 : initTransactionCommandService();
142 247 : initQueryService();
143 247 : }
144 :
145 : /**
146 : * Dropping iroha daemon storage
147 : */
148 : void Irohad::dropStorage() {
149 0 : storage->reset();
150 0 : }
151 :
152 : /**
153 : * Initializing iroha daemon storage
154 : */
155 : void Irohad::initStorage() {
156 247 : common_objects_factory_ =
157 247 : std::make_shared<shared_model::proto::ProtoCommonObjectsFactory<
158 : shared_model::validation::FieldValidator>>();
159 : auto perm_converter =
160 247 : std::make_shared<shared_model::proto::ProtoPermissionToString>();
161 : auto block_converter =
162 247 : std::make_shared<shared_model::proto::ProtoBlockJsonConverter>();
163 247 : auto storageResult = StorageImpl::create(block_store_dir_,
164 247 : pg_conn_,
165 247 : common_objects_factory_,
166 247 : std::move(block_converter),
167 247 : perm_converter,
168 247 : log_manager_->getChild("Storage"));
169 247 : storageResult.match(
170 : [&](expected::Value<std::shared_ptr<ametsuchi::StorageImpl>> &_storage) {
171 247 : storage = _storage.value;
172 247 : },
173 : [&](expected::Error<std::string> &error) { log_->error(error.error); });
174 :
175 247 : log_->info("[Init] => storage", logger::logBool(storage));
176 247 : }
177 :
178 : bool Irohad::restoreWsv() {
179 247 : return wsv_restorer_->restoreWsv(*storage).match(
180 : [](iroha::expected::Value<void> v) { return true; },
181 : [&](iroha::expected::Error<std::string> &error) {
182 0 : log_->error(error.error);
183 0 : return false;
184 : });
185 0 : }
186 :
187 : /**
188 : * Initializing crypto provider
189 : */
190 : void Irohad::initCryptoProvider() {
191 247 : crypto_signer_ =
192 247 : std::make_shared<shared_model::crypto::CryptoModelSigner<>>(keypair);
193 :
194 247 : log_->info("[Init] => crypto provider");
195 247 : }
196 :
197 : void Irohad::initBatchParser() {
198 247 : batch_parser =
199 247 : std::make_shared<shared_model::interface::TransactionBatchParserImpl>();
200 :
201 247 : log_->info("[Init] => transaction batch parser");
202 247 : }
203 :
204 : /**
205 : * Initializing validators
206 : */
207 : void Irohad::initValidators() {
208 247 : auto factory = std::make_unique<shared_model::proto::ProtoProposalFactory<
209 : shared_model::validation::DefaultProposalValidator>>();
210 247 : auto validators_log_manager = log_manager_->getChild("Validators");
211 247 : stateful_validator = std::make_shared<StatefulValidatorImpl>(
212 247 : std::move(factory),
213 247 : batch_parser,
214 247 : validators_log_manager->getChild("Stateful")->getLogger());
215 247 : chain_validator = std::make_shared<ChainValidatorImpl>(
216 247 : getSupermajorityChecker(kConsensusConsistencyModel),
217 247 : validators_log_manager->getChild("Chain")->getLogger());
218 :
219 247 : log_->info("[Init] => validators");
220 247 : }
221 :
222 : /**
223 : * Initializing network client
224 : */
225 : void Irohad::initNetworkClient() {
226 247 : async_call_ =
227 247 : std::make_shared<network::AsyncGrpcClient<google::protobuf::Empty>>(
228 247 : log_manager_->getChild("AsyncNetworkClient")->getLogger());
229 247 : }
230 :
231 : void Irohad::initFactories() {
232 : // proposal factory
233 : std::shared_ptr<
234 : shared_model::validation::AbstractValidator<iroha::protocol::Transaction>>
235 247 : proto_transaction_validator = std::make_shared<
236 : shared_model::validation::ProtoTransactionValidator>();
237 : std::unique_ptr<shared_model::validation::AbstractValidator<
238 : shared_model::interface::Proposal>>
239 247 : proposal_validator = std::make_unique<
240 : shared_model::validation::DefaultProposalValidator>();
241 : std::unique_ptr<
242 : shared_model::validation::AbstractValidator<iroha::protocol::Proposal>>
243 247 : proto_proposal_validator =
244 247 : std::make_unique<shared_model::validation::ProtoProposalValidator>(
245 : proto_transaction_validator);
246 247 : proposal_factory =
247 247 : std::make_shared<shared_model::proto::ProtoTransportFactory<
248 : shared_model::interface::Proposal,
249 247 : shared_model::proto::Proposal>>(std::move(proposal_validator),
250 247 : std::move(proto_proposal_validator));
251 :
252 : // transaction factories
253 247 : transaction_batch_factory_ =
254 247 : std::make_shared<shared_model::interface::TransactionBatchFactoryImpl>();
255 :
256 : std::unique_ptr<shared_model::validation::AbstractValidator<
257 : shared_model::interface::Transaction>>
258 247 : transaction_validator =
259 247 : std::make_unique<shared_model::validation::
260 : DefaultOptionalSignedTransactionValidator>();
261 247 : transaction_factory =
262 247 : std::make_shared<shared_model::proto::ProtoTransportFactory<
263 : shared_model::interface::Transaction,
264 : shared_model::proto::Transaction>>(
265 247 : std::move(transaction_validator),
266 247 : std::move(proto_transaction_validator));
267 :
268 : // query factories
269 247 : query_response_factory_ =
270 247 : std::make_shared<shared_model::proto::ProtoQueryResponseFactory>();
271 :
272 : std::unique_ptr<shared_model::validation::AbstractValidator<
273 : shared_model::interface::Query>>
274 247 : query_validator = std::make_unique<
275 : shared_model::validation::DefaultSignedQueryValidator>();
276 : std::unique_ptr<
277 : shared_model::validation::AbstractValidator<iroha::protocol::Query>>
278 247 : proto_query_validator =
279 247 : std::make_unique<shared_model::validation::ProtoQueryValidator>();
280 247 : query_factory = std::make_shared<
281 : shared_model::proto::ProtoTransportFactory<shared_model::interface::Query,
282 : shared_model::proto::Query>>(
283 247 : std::move(query_validator), std::move(proto_query_validator));
284 :
285 247 : log_->info("[Init] => factories");
286 247 : }
287 :
288 : /**
289 : * Initializing persistent cache
290 : */
291 : void Irohad::initPersistentCache() {
292 247 : persistent_cache = std::make_shared<TxPresenceCacheImpl>(storage);
293 :
294 247 : log_->info("[Init] => persistent cache");
295 247 : }
296 :
297 : /**
298 : * Initializing ordering gate
299 : */
300 : void Irohad::initOrderingGate() {
301 247 : auto block_query = storage->createBlockQuery();
302 247 : if (not block_query) {
303 0 : log_->error("Failed to create block query");
304 0 : return;
305 : }
306 : // since delay is 2, it is required to get two more hashes from block store,
307 : // in addition to top block
308 247 : const size_t kNumBlocks = 3;
309 247 : auto blocks = (*block_query)->getTopBlocks(kNumBlocks);
310 247 : auto hash_stub = shared_model::interface::types::HashType{std::string(
311 247 : shared_model::crypto::DefaultCryptoAlgorithmType::kHashLength, '0')};
312 247 : auto hashes = std::accumulate(
313 247 : blocks.begin(),
314 247 : std::prev(blocks.end()),
315 : // add hash stubs if there are not enough blocks in storage
316 247 : std::vector<shared_model::interface::types::HashType>{
317 247 : kNumBlocks - blocks.size(), hash_stub},
318 : [](auto &acc, const auto &val) {
319 1 : acc.push_back(val->hash());
320 1 : return acc;
321 : });
322 :
323 247 : auto factory = std::make_unique<shared_model::proto::ProtoProposalFactory<
324 : shared_model::validation::DefaultProposalValidator>>();
325 :
326 247 : const uint64_t kCounter = 0, kMaxLocalCounter = 2;
327 : // reject_delay and local_counter are local mutable variables of lambda
328 247 : const auto kMaxDelay(max_rounds_delay_);
329 : const auto kMaxDelayIncrement(std::chrono::milliseconds(1000));
330 : auto delay = [reject_delay = std::chrono::milliseconds(0),
331 : local_counter = kCounter,
332 : // MSVC requires const variables to be captured
333 247 : kMaxDelay,
334 247 : kMaxDelayIncrement,
335 : kMaxLocalCounter](const auto &commit) mutable {
336 : using iroha::synchronizer::SynchronizationOutcomeType;
337 3373 : if (commit.sync_outcome == SynchronizationOutcomeType::kReject
338 3373 : or commit.sync_outcome == SynchronizationOutcomeType::kNothing) {
339 : // Increment reject_counter each local_counter calls of function
340 2409 : ++local_counter;
341 2409 : if (local_counter == kMaxLocalCounter) {
342 1132 : local_counter = 0;
343 1132 : if (reject_delay < kMaxDelay) {
344 0 : reject_delay += std::min(kMaxDelay, kMaxDelayIncrement);
345 0 : }
346 1132 : }
347 2409 : } else {
348 964 : reject_delay = std::chrono::milliseconds(0);
349 : }
350 3373 : return reject_delay;
351 : };
352 :
353 247 : ordering_gate =
354 247 : ordering_init.initOrderingGate(max_proposal_size_,
355 247 : proposal_delay_,
356 247 : std::move(hashes),
357 247 : storage,
358 247 : transaction_factory,
359 247 : batch_parser,
360 247 : transaction_batch_factory_,
361 247 : async_call_,
362 247 : std::move(factory),
363 247 : proposal_factory,
364 247 : persistent_cache,
365 247 : {blocks.back()->height(), 1},
366 247 : delay,
367 247 : log_manager_->getChild("Ordering"));
368 247 : log_->info("[Init] => init ordering gate - [{}]",
369 247 : logger::logBool(ordering_gate));
370 247 : }
371 :
372 : /**
373 : * Initializing iroha verified proposal creator and block creator
374 : */
375 : void Irohad::initSimulator() {
376 247 : auto block_factory = std::make_unique<shared_model::proto::ProtoBlockFactory>(
377 : // Block factory in simulator uses UnsignedBlockValidator because it is
378 : // not required to check signatures of block here, as they will be
379 : // checked when supermajority of peers will sign the block. It is also
380 : // not required to validate signatures of transactions here because they
381 : // are validated in the ordering gate, where they are received from the
382 : // ordering service.
383 247 : std::make_unique<
384 : shared_model::validation::DefaultUnsignedBlockValidator>(),
385 247 : std::make_unique<shared_model::validation::ProtoBlockValidator>());
386 247 : simulator = std::make_shared<Simulator>(
387 247 : ordering_gate,
388 247 : stateful_validator,
389 247 : storage,
390 247 : storage,
391 247 : crypto_signer_,
392 247 : std::move(block_factory),
393 247 : log_manager_->getChild("Simulator")->getLogger());
394 :
395 247 : log_->info("[Init] => init simulator");
396 247 : }
397 :
398 : /**
399 : * Initializing consensus block cache
400 : */
401 : void Irohad::initConsensusCache() {
402 247 : consensus_result_cache_ = std::make_shared<consensus::ConsensusResultCache>();
403 :
404 247 : log_->info("[Init] => init consensus block cache");
405 247 : }
406 :
407 : /**
408 : * Initializing block loader
409 : */
410 : void Irohad::initBlockLoader() {
411 247 : block_loader =
412 247 : loader_init.initBlockLoader(storage,
413 247 : storage,
414 247 : consensus_result_cache_,
415 247 : log_manager_->getChild("BlockLoader"));
416 :
417 247 : log_->info("[Init] => block loader");
418 247 : }
419 :
420 : /**
421 : * Initializing consensus gate
422 : */
423 : void Irohad::initConsensusGate() {
424 247 : consensus_gate =
425 247 : yac_init.initConsensusGate(storage,
426 247 : simulator,
427 247 : block_loader,
428 247 : keypair,
429 247 : consensus_result_cache_,
430 247 : vote_delay_,
431 247 : async_call_,
432 247 : common_objects_factory_,
433 : kConsensusConsistencyModel,
434 247 : log_manager_->getChild("Consensus"));
435 247 : consensus_gate->onOutcome().subscribe(
436 247 : consensus_gate_events_subscription,
437 247 : consensus_gate_objects.get_subscriber());
438 247 : log_->info("[Init] => consensus gate");
439 247 : }
440 :
441 : /**
442 : * Initializing synchronizer
443 : */
444 : void Irohad::initSynchronizer() {
445 247 : synchronizer = std::make_shared<SynchronizerImpl>(
446 247 : consensus_gate,
447 247 : chain_validator,
448 247 : storage,
449 247 : storage,
450 247 : block_loader,
451 247 : log_manager_->getChild("Synchronizer")->getLogger());
452 :
453 247 : log_->info("[Init] => synchronizer");
454 247 : }
455 :
456 : /**
457 : * Initializing peer communication service
458 : */
459 : void Irohad::initPeerCommunicationService() {
460 247 : pcs = std::make_shared<PeerCommunicationServiceImpl>(
461 247 : ordering_gate,
462 247 : synchronizer,
463 247 : simulator,
464 247 : log_manager_->getChild("PeerCommunicationService")->getLogger());
465 :
466 : pcs->onProposal().subscribe([this](const auto &) {
467 3373 : log_->info("~~~~~~~~~| PROPOSAL ^_^ |~~~~~~~~~ ");
468 3373 : });
469 :
470 : pcs->on_commit().subscribe([this](const auto &event) {
471 : using iroha::synchronizer::SynchronizationOutcomeType;
472 3126 : switch (event.sync_outcome) {
473 : case SynchronizationOutcomeType::kCommit:
474 717 : log_->info(R"(~~~~~~~~~| COMMIT =^._.^= |~~~~~~~~~ )");
475 717 : break;
476 : case SynchronizationOutcomeType::kReject:
477 0 : log_->info(R"(~~~~~~~~~| REJECT \(*.*)/ |~~~~~~~~~ )");
478 0 : break;
479 : case SynchronizationOutcomeType::kNothing:
480 2409 : log_->info(R"(~~~~~~~~~| EMPTY (-_-)zzz |~~~~~~~~~ )");
481 2409 : break;
482 : default:
483 0 : break;
484 : }
485 3126 : });
486 :
487 247 : log_->info("[Init] => pcs");
488 247 : }
489 :
490 : void Irohad::initStatusBus() {
491 247 : status_bus_ = std::make_shared<StatusBusImpl>();
492 247 : log_->info("[Init] => Tx status bus");
493 247 : }
494 :
495 : void Irohad::initMstProcessor() {
496 : auto mst_logger_manager =
497 247 : log_manager_->getChild("MultiSignatureTransactions");
498 247 : auto mst_state_logger = mst_logger_manager->getChild("State")->getLogger();
499 247 : auto mst_completer = std::make_shared<DefaultCompleter>(mst_expiration_time_);
500 247 : auto mst_storage = std::make_shared<MstStorageStateImpl>(
501 : mst_completer,
502 : mst_state_logger,
503 247 : mst_logger_manager->getChild("Storage")->getLogger());
504 247 : std::shared_ptr<iroha::PropagationStrategy> mst_propagation;
505 247 : if (is_mst_supported_) {
506 7 : mst_transport = std::make_shared<iroha::network::MstTransportGrpc>(
507 7 : async_call_,
508 7 : transaction_factory,
509 7 : batch_parser,
510 7 : transaction_batch_factory_,
511 7 : persistent_cache,
512 : mst_completer,
513 7 : keypair.publicKey(),
514 7 : std::move(mst_state_logger),
515 7 : mst_logger_manager->getChild("Transport")->getLogger());
516 7 : mst_propagation = std::make_shared<GossipPropagationStrategy>(
517 7 : storage, rxcpp::observe_on_new_thread(), *opt_mst_gossip_params_);
518 7 : } else {
519 240 : mst_transport = std::make_shared<iroha::network::MstTransportStub>();
520 240 : mst_propagation = std::make_shared<iroha::PropagationStrategyStub>();
521 : }
522 :
523 247 : auto mst_time = std::make_shared<MstTimeProviderImpl>();
524 247 : auto fair_mst_processor = std::make_shared<FairMstProcessor>(
525 247 : mst_transport,
526 : mst_storage,
527 : mst_propagation,
528 : mst_time,
529 247 : mst_logger_manager->getChild("Processor")->getLogger());
530 247 : mst_processor = fair_mst_processor;
531 247 : mst_transport->subscribe(fair_mst_processor);
532 247 : log_->info("[Init] => MST processor");
533 247 : }
534 :
535 : void Irohad::initPendingTxsStorage() {
536 247 : pending_txs_storage_ = std::make_shared<PendingTransactionStorageImpl>(
537 247 : mst_processor->onStateUpdate(),
538 247 : mst_processor->onPreparedBatches(),
539 247 : mst_processor->onExpiredBatches());
540 247 : log_->info("[Init] => pending transactions storage");
541 247 : }
542 :
543 : /**
544 : * Initializing transaction command service
545 : */
546 : void Irohad::initTransactionCommandService() {
547 247 : auto command_service_log_manager = log_manager_->getChild("CommandService");
548 : auto status_factory =
549 247 : std::make_shared<shared_model::proto::ProtoTxStatusFactory>();
550 247 : auto cs_cache = std::make_shared<::torii::CommandServiceImpl::CacheType>();
551 247 : auto tx_processor = std::make_shared<TransactionProcessorImpl>(
552 247 : pcs,
553 247 : mst_processor,
554 247 : status_bus_,
555 : status_factory,
556 247 : command_service_log_manager->getChild("Processor")->getLogger());
557 247 : command_service = std::make_shared<::torii::CommandServiceImpl>(
558 : tx_processor,
559 247 : storage,
560 247 : status_bus_,
561 : status_factory,
562 : cs_cache,
563 247 : persistent_cache,
564 247 : command_service_log_manager->getLogger());
565 247 : command_service_transport =
566 247 : std::make_shared<::torii::CommandServiceTransportGrpc>(
567 247 : command_service,
568 247 : status_bus_,
569 : status_factory,
570 247 : transaction_factory,
571 247 : batch_parser,
572 247 : transaction_batch_factory_,
573 : consensus_gate_objects.get_observable().map([](const auto &) {
574 0 : return ::torii::CommandServiceTransportGrpc::ConsensusGateEvent{};
575 : }),
576 247 : stale_stream_max_rounds_,
577 247 : command_service_log_manager->getChild("Transport")->getLogger());
578 :
579 247 : log_->info("[Init] => command service");
580 247 : }
581 :
582 : /**
583 : * Initializing query command service
584 : */
585 : void Irohad::initQueryService() {
586 247 : auto query_service_log_manager = log_manager_->getChild("QueryService");
587 247 : auto query_processor = std::make_shared<QueryProcessorImpl>(
588 247 : storage,
589 247 : storage,
590 247 : pending_txs_storage_,
591 247 : query_response_factory_,
592 247 : query_service_log_manager->getChild("Processor")->getLogger());
593 :
594 247 : query_service = std::make_shared<::torii::QueryService>(
595 247 : query_processor, query_factory, query_service_log_manager->getLogger());
596 :
597 247 : log_->info("[Init] => query service");
598 247 : }
599 :
600 : void Irohad::initWsvRestorer() {
601 247 : wsv_restorer_ = std::make_shared<iroha::ametsuchi::WsvRestorerImpl>();
602 247 : }
603 :
604 : /**
605 : * Run iroha daemon
606 : */
607 : Irohad::RunResult Irohad::run() {
608 : using iroha::expected::operator|;
609 : using iroha::operator|;
610 :
611 : // Initializing torii server
612 247 : torii_server = std::make_unique<ServerRunner>(
613 247 : listen_ip_ + ":" + std::to_string(torii_port_),
614 247 : log_manager_->getChild("ToriiServerRunner")->getLogger(),
615 247 : false);
616 :
617 : // Initializing internal server
618 247 : internal_server = std::make_unique<ServerRunner>(
619 247 : listen_ip_ + ":" + std::to_string(internal_port_),
620 247 : log_manager_->getChild("InternalServerRunner")->getLogger(),
621 247 : false);
622 :
623 : // Run torii server
624 247 : return (torii_server->append(command_service_transport)
625 247 : .append(query_service)
626 247 : .run()
627 247 : |
628 : [&](const auto &port) {
629 247 : log_->info("Torii server bound on port {}", port);
630 247 : if (is_mst_supported_) {
631 7 : internal_server->append(
632 7 : std::static_pointer_cast<MstTransportGrpc>(mst_transport));
633 7 : }
634 : // Run internal server
635 247 : return internal_server->append(ordering_init.service)
636 247 : .append(yac_init.getConsensusNetwork())
637 247 : .append(loader_init.service)
638 247 : .run();
639 0 : })
640 247 : .match(
641 : [&](const auto &port) -> RunResult {
642 247 : log_->info("Internal server bound on port {}", port.value);
643 247 : log_->info("===> iroha initialized");
644 : // initiate first round
645 247 : auto block_query = storage->createBlockQuery();
646 247 : if (not block_query) {
647 0 : return expected::makeError("Failed to create block query");
648 : }
649 247 : auto block_var = (*block_query)->getTopBlock();
650 247 : if (auto e = boost::get<expected::Error<std::string>>(&block_var)) {
651 0 : return expected::makeError("Failed to get the top block: "
652 0 : + e->error);
653 : }
654 :
655 247 : auto block = boost::get<expected::Value<
656 247 : std::shared_ptr<shared_model::interface::Block>>>(&block_var)
657 247 : ->value;
658 :
659 247 : pcs->on_commit().subscribe(ordering_init.notifier.get_subscriber());
660 :
661 247 : ordering_init.notifier.get_subscriber().on_next(
662 247 : synchronizer::SynchronizationEvent{
663 247 : rxcpp::observable<>::just(block),
664 : SynchronizationOutcomeType::kCommit,
665 247 : {block->height(), ordering::kFirstRejectRound}});
666 247 : return {};
667 247 : },
668 : [&](const expected::Error<std::string> &e) -> RunResult {
669 0 : log_->error(e.error);
670 0 : return e;
671 : });
672 0 : }
|