LCOV - code coverage report
Current view: top level - irohad/main - application.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 331 351 94.3 %
Date: 2019-03-07 14:46:43 Functions: 33 41 80.5 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "main/application.hpp"
       7             : 
       8             : #include "ametsuchi/impl/storage_impl.hpp"
       9             : #include "ametsuchi/impl/tx_presence_cache_impl.hpp"
      10             : #include "ametsuchi/impl/wsv_restorer_impl.hpp"
      11             : #include "backend/protobuf/common_objects/proto_common_objects_factory.hpp"
      12             : #include "backend/protobuf/proto_block_json_converter.hpp"
      13             : #include "backend/protobuf/proto_permission_to_string.hpp"
      14             : #include "backend/protobuf/proto_proposal_factory.hpp"
      15             : #include "backend/protobuf/proto_query_response_factory.hpp"
      16             : #include "backend/protobuf/proto_transport_factory.hpp"
      17             : #include "backend/protobuf/proto_tx_status_factory.hpp"
      18             : #include "common/bind.hpp"
      19             : #include "consensus/yac/consistency_model.hpp"
      20             : #include "cryptography/crypto_provider/crypto_model_signer.hpp"
      21             : #include "interfaces/iroha_internal/transaction_batch_factory_impl.hpp"
      22             : #include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp"
      23             : #include "logger/logger.hpp"
      24             : #include "logger/logger_manager.hpp"
      25             : #include "main/server_runner.hpp"
      26             : #include "multi_sig_transactions/gossip_propagation_strategy.hpp"
      27             : #include "multi_sig_transactions/mst_processor_impl.hpp"
      28             : #include "multi_sig_transactions/mst_propagation_strategy_stub.hpp"
      29             : #include "multi_sig_transactions/mst_time_provider_impl.hpp"
      30             : #include "multi_sig_transactions/storage/mst_storage_impl.hpp"
      31             : #include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
      32             : #include "multi_sig_transactions/transport/mst_transport_stub.hpp"
      33             : #include "network/impl/block_loader_impl.hpp"
      34             : #include "network/impl/peer_communication_service_impl.hpp"
      35             : #include "ordering/impl/on_demand_common.hpp"
      36             : #include "ordering/impl/on_demand_ordering_gate.hpp"
      37             : #include "pending_txs_storage/impl/pending_txs_storage_impl.hpp"
      38             : #include "simulator/impl/simulator.hpp"
      39             : #include "synchronizer/impl/synchronizer_impl.hpp"
      40             : #include "torii/impl/command_service_impl.hpp"
      41             : #include "torii/impl/command_service_transport_grpc.hpp"
      42             : #include "torii/impl/status_bus_impl.hpp"
      43             : #include "torii/processor/query_processor_impl.hpp"
      44             : #include "torii/processor/transaction_processor_impl.hpp"
      45             : #include "torii/query_service.hpp"
      46             : #include "validation/impl/chain_validator_impl.hpp"
      47             : #include "validation/impl/stateful_validator_impl.hpp"
      48             : #include "validators/default_validator.hpp"
      49             : #include "validators/field_validator.hpp"
      50             : #include "validators/protobuf/proto_block_validator.hpp"
      51             : #include "validators/protobuf/proto_proposal_validator.hpp"
      52             : #include "validators/protobuf/proto_query_validator.hpp"
      53             : #include "validators/protobuf/proto_transaction_validator.hpp"
      54             : 
      55             : using namespace iroha;
      56             : using namespace iroha::ametsuchi;
      57             : using namespace iroha::simulator;
      58             : using namespace iroha::validation;
      59             : using namespace iroha::network;
      60             : using namespace iroha::synchronizer;
      61             : using namespace iroha::torii;
      62             : using namespace iroha::consensus::yac;
      63             : 
      64             : using namespace std::chrono_literals;
      65             : 
      66             : /// Consensus consistency model type.
      67             : static constexpr iroha::consensus::yac::ConsistencyModel
      68             :     kConsensusConsistencyModel = iroha::consensus::yac::ConsistencyModel::kBft;
      69             : 
      70             : /**
      71             :  * Configuring iroha daemon
      72             :  */
      73             : Irohad::Irohad(const std::string &block_store_dir,
      74             :                const std::string &pg_conn,
      75             :                const std::string &listen_ip,
      76             :                size_t torii_port,
      77             :                size_t internal_port,
      78             :                size_t max_proposal_size,
      79             :                std::chrono::milliseconds proposal_delay,
      80             :                std::chrono::milliseconds vote_delay,
      81             :                std::chrono::minutes mst_expiration_time,
      82             :                const shared_model::crypto::Keypair &keypair,
      83             :                std::chrono::milliseconds max_rounds_delay,
      84             :                size_t stale_stream_max_rounds,
      85             :                logger::LoggerManagerTreePtr logger_manager,
      86             :                const boost::optional<GossipPropagationStrategyParams>
      87             :                    &opt_mst_gossip_params)
      88         247 :     : block_store_dir_(block_store_dir),
      89         247 :       pg_conn_(pg_conn),
      90         247 :       listen_ip_(listen_ip),
      91         247 :       torii_port_(torii_port),
      92         247 :       internal_port_(internal_port),
      93         247 :       max_proposal_size_(max_proposal_size),
      94         247 :       proposal_delay_(proposal_delay),
      95         247 :       vote_delay_(vote_delay),
      96         247 :       is_mst_supported_(opt_mst_gossip_params),
      97         247 :       mst_expiration_time_(mst_expiration_time),
      98         247 :       max_rounds_delay_(max_rounds_delay),
      99         247 :       stale_stream_max_rounds_(stale_stream_max_rounds),
     100         247 :       opt_mst_gossip_params_(opt_mst_gossip_params),
     101         247 :       keypair(keypair),
     102         247 :       ordering_init(logger_manager->getLogger()),
     103         247 :       log_manager_(std::move(logger_manager)),
     104         247 :       log_(log_manager_->getLogger()) {
     105         247 :   log_->info("created");
     106             :   // Initializing storage at this point in order to insert genesis block before
     107             :   // initialization of iroha daemon
     108         247 :   initStorage();
     109         247 : }
     110             : 
     111             : Irohad::~Irohad() {
     112         247 :   consensus_gate_events_subscription.unsubscribe();
     113         247 : }
     114             : 
     115             : /**
     116             :  * Initializing iroha daemon
     117             :  */
     118             : void Irohad::init() {
     119             :   // Recover WSV from the existing ledger to be sure it is consistent
     120         247 :   initWsvRestorer();
     121         247 :   restoreWsv();
     122             : 
     123         247 :   initCryptoProvider();
     124         247 :   initBatchParser();
     125         247 :   initValidators();
     126         247 :   initNetworkClient();
     127         247 :   initFactories();
     128         247 :   initPersistentCache();
     129         247 :   initOrderingGate();
     130         247 :   initSimulator();
     131         247 :   initConsensusCache();
     132         247 :   initBlockLoader();
     133         247 :   initConsensusGate();
     134         247 :   initSynchronizer();
     135         247 :   initPeerCommunicationService();
     136         247 :   initStatusBus();
     137         247 :   initMstProcessor();
     138         247 :   initPendingTxsStorage();
     139             : 
     140             :   // Torii
     141         247 :   initTransactionCommandService();
     142         247 :   initQueryService();
     143         247 : }
     144             : 
     145             : /**
     146             :  * Dropping iroha daemon storage
     147             :  */
     148             : void Irohad::dropStorage() {
     149           0 :   storage->reset();
     150           0 : }
     151             : 
     152             : /**
     153             :  * Initializing iroha daemon storage
     154             :  */
     155             : void Irohad::initStorage() {
     156         247 :   common_objects_factory_ =
     157         247 :       std::make_shared<shared_model::proto::ProtoCommonObjectsFactory<
     158             :           shared_model::validation::FieldValidator>>();
     159             :   auto perm_converter =
     160         247 :       std::make_shared<shared_model::proto::ProtoPermissionToString>();
     161             :   auto block_converter =
     162         247 :       std::make_shared<shared_model::proto::ProtoBlockJsonConverter>();
     163         247 :   auto storageResult = StorageImpl::create(block_store_dir_,
     164         247 :                                            pg_conn_,
     165         247 :                                            common_objects_factory_,
     166         247 :                                            std::move(block_converter),
     167         247 :                                            perm_converter,
     168         247 :                                            log_manager_->getChild("Storage"));
     169         247 :   storageResult.match(
     170             :       [&](expected::Value<std::shared_ptr<ametsuchi::StorageImpl>> &_storage) {
     171         247 :         storage = _storage.value;
     172         247 :       },
     173             :       [&](expected::Error<std::string> &error) { log_->error(error.error); });
     174             : 
     175         247 :   log_->info("[Init] => storage", logger::logBool(storage));
     176         247 : }
     177             : 
     178             : bool Irohad::restoreWsv() {
     179         247 :   return wsv_restorer_->restoreWsv(*storage).match(
     180             :       [](iroha::expected::Value<void> v) { return true; },
     181             :       [&](iroha::expected::Error<std::string> &error) {
     182           0 :         log_->error(error.error);
     183           0 :         return false;
     184             :       });
     185           0 : }
     186             : 
     187             : /**
     188             :  * Initializing crypto provider
     189             :  */
     190             : void Irohad::initCryptoProvider() {
     191         247 :   crypto_signer_ =
     192         247 :       std::make_shared<shared_model::crypto::CryptoModelSigner<>>(keypair);
     193             : 
     194         247 :   log_->info("[Init] => crypto provider");
     195         247 : }
     196             : 
     197             : void Irohad::initBatchParser() {
     198         247 :   batch_parser =
     199         247 :       std::make_shared<shared_model::interface::TransactionBatchParserImpl>();
     200             : 
     201         247 :   log_->info("[Init] => transaction batch parser");
     202         247 : }
     203             : 
     204             : /**
     205             :  * Initializing validators
     206             :  */
     207             : void Irohad::initValidators() {
     208         247 :   auto factory = std::make_unique<shared_model::proto::ProtoProposalFactory<
     209             :       shared_model::validation::DefaultProposalValidator>>();
     210         247 :   auto validators_log_manager = log_manager_->getChild("Validators");
     211         247 :   stateful_validator = std::make_shared<StatefulValidatorImpl>(
     212         247 :       std::move(factory),
     213         247 :       batch_parser,
     214         247 :       validators_log_manager->getChild("Stateful")->getLogger());
     215         247 :   chain_validator = std::make_shared<ChainValidatorImpl>(
     216         247 :       getSupermajorityChecker(kConsensusConsistencyModel),
     217         247 :       validators_log_manager->getChild("Chain")->getLogger());
     218             : 
     219         247 :   log_->info("[Init] => validators");
     220         247 : }
     221             : 
     222             : /**
     223             :  * Initializing network client
     224             :  */
     225             : void Irohad::initNetworkClient() {
     226         247 :   async_call_ =
     227         247 :       std::make_shared<network::AsyncGrpcClient<google::protobuf::Empty>>(
     228         247 :           log_manager_->getChild("AsyncNetworkClient")->getLogger());
     229         247 : }
     230             : 
     231             : void Irohad::initFactories() {
     232             :   // proposal factory
     233             :   std::shared_ptr<
     234             :       shared_model::validation::AbstractValidator<iroha::protocol::Transaction>>
     235         247 :       proto_transaction_validator = std::make_shared<
     236             :           shared_model::validation::ProtoTransactionValidator>();
     237             :   std::unique_ptr<shared_model::validation::AbstractValidator<
     238             :       shared_model::interface::Proposal>>
     239         247 :       proposal_validator = std::make_unique<
     240             :           shared_model::validation::DefaultProposalValidator>();
     241             :   std::unique_ptr<
     242             :       shared_model::validation::AbstractValidator<iroha::protocol::Proposal>>
     243         247 :       proto_proposal_validator =
     244         247 :           std::make_unique<shared_model::validation::ProtoProposalValidator>(
     245             :               proto_transaction_validator);
     246         247 :   proposal_factory =
     247         247 :       std::make_shared<shared_model::proto::ProtoTransportFactory<
     248             :           shared_model::interface::Proposal,
     249         247 :           shared_model::proto::Proposal>>(std::move(proposal_validator),
     250         247 :                                           std::move(proto_proposal_validator));
     251             : 
     252             :   // transaction factories
     253         247 :   transaction_batch_factory_ =
     254         247 :       std::make_shared<shared_model::interface::TransactionBatchFactoryImpl>();
     255             : 
     256             :   std::unique_ptr<shared_model::validation::AbstractValidator<
     257             :       shared_model::interface::Transaction>>
     258         247 :       transaction_validator =
     259         247 :           std::make_unique<shared_model::validation::
     260             :                                DefaultOptionalSignedTransactionValidator>();
     261         247 :   transaction_factory =
     262         247 :       std::make_shared<shared_model::proto::ProtoTransportFactory<
     263             :           shared_model::interface::Transaction,
     264             :           shared_model::proto::Transaction>>(
     265         247 :           std::move(transaction_validator),
     266         247 :           std::move(proto_transaction_validator));
     267             : 
     268             :   // query factories
     269         247 :   query_response_factory_ =
     270         247 :       std::make_shared<shared_model::proto::ProtoQueryResponseFactory>();
     271             : 
     272             :   std::unique_ptr<shared_model::validation::AbstractValidator<
     273             :       shared_model::interface::Query>>
     274         247 :       query_validator = std::make_unique<
     275             :           shared_model::validation::DefaultSignedQueryValidator>();
     276             :   std::unique_ptr<
     277             :       shared_model::validation::AbstractValidator<iroha::protocol::Query>>
     278         247 :       proto_query_validator =
     279         247 :           std::make_unique<shared_model::validation::ProtoQueryValidator>();
     280         247 :   query_factory = std::make_shared<
     281             :       shared_model::proto::ProtoTransportFactory<shared_model::interface::Query,
     282             :                                                  shared_model::proto::Query>>(
     283         247 :       std::move(query_validator), std::move(proto_query_validator));
     284             : 
     285         247 :   log_->info("[Init] => factories");
     286         247 : }
     287             : 
     288             : /**
     289             :  * Initializing persistent cache
     290             :  */
     291             : void Irohad::initPersistentCache() {
     292         247 :   persistent_cache = std::make_shared<TxPresenceCacheImpl>(storage);
     293             : 
     294         247 :   log_->info("[Init] => persistent cache");
     295         247 : }
     296             : 
     297             : /**
     298             :  * Initializing ordering gate
     299             :  */
     300             : void Irohad::initOrderingGate() {
     301         247 :   auto block_query = storage->createBlockQuery();
     302         247 :   if (not block_query) {
     303           0 :     log_->error("Failed to create block query");
     304           0 :     return;
     305             :   }
     306             :   // since delay is 2, it is required to get two more hashes from block store,
     307             :   // in addition to top block
     308         247 :   const size_t kNumBlocks = 3;
     309         247 :   auto blocks = (*block_query)->getTopBlocks(kNumBlocks);
     310         247 :   auto hash_stub = shared_model::interface::types::HashType{std::string(
     311         247 :       shared_model::crypto::DefaultCryptoAlgorithmType::kHashLength, '0')};
     312         247 :   auto hashes = std::accumulate(
     313         247 :       blocks.begin(),
     314         247 :       std::prev(blocks.end()),
     315             :       // add hash stubs if there are not enough blocks in storage
     316         247 :       std::vector<shared_model::interface::types::HashType>{
     317         247 :           kNumBlocks - blocks.size(), hash_stub},
     318             :       [](auto &acc, const auto &val) {
     319           1 :         acc.push_back(val->hash());
     320           1 :         return acc;
     321             :       });
     322             : 
     323         247 :   auto factory = std::make_unique<shared_model::proto::ProtoProposalFactory<
     324             :       shared_model::validation::DefaultProposalValidator>>();
     325             : 
     326         247 :   const uint64_t kCounter = 0, kMaxLocalCounter = 2;
     327             :   // reject_delay and local_counter are local mutable variables of lambda
     328         247 :   const auto kMaxDelay(max_rounds_delay_);
     329             :   const auto kMaxDelayIncrement(std::chrono::milliseconds(1000));
     330             :   auto delay = [reject_delay = std::chrono::milliseconds(0),
     331             :                 local_counter = kCounter,
     332             :                 // MSVC requires const variables to be captured
     333         247 :                 kMaxDelay,
     334         247 :                 kMaxDelayIncrement,
     335             :                 kMaxLocalCounter](const auto &commit) mutable {
     336             :     using iroha::synchronizer::SynchronizationOutcomeType;
     337        3373 :     if (commit.sync_outcome == SynchronizationOutcomeType::kReject
     338        3373 :         or commit.sync_outcome == SynchronizationOutcomeType::kNothing) {
     339             :       // Increment reject_counter each local_counter calls of function
     340        2409 :       ++local_counter;
     341        2409 :       if (local_counter == kMaxLocalCounter) {
     342        1132 :         local_counter = 0;
     343        1132 :         if (reject_delay < kMaxDelay) {
     344           0 :           reject_delay += std::min(kMaxDelay, kMaxDelayIncrement);
     345           0 :         }
     346        1132 :       }
     347        2409 :     } else {
     348         964 :       reject_delay = std::chrono::milliseconds(0);
     349             :     }
     350        3373 :     return reject_delay;
     351             :   };
     352             : 
     353         247 :   ordering_gate =
     354         247 :       ordering_init.initOrderingGate(max_proposal_size_,
     355         247 :                                      proposal_delay_,
     356         247 :                                      std::move(hashes),
     357         247 :                                      storage,
     358         247 :                                      transaction_factory,
     359         247 :                                      batch_parser,
     360         247 :                                      transaction_batch_factory_,
     361         247 :                                      async_call_,
     362         247 :                                      std::move(factory),
     363         247 :                                      proposal_factory,
     364         247 :                                      persistent_cache,
     365         247 :                                      {blocks.back()->height(), 1},
     366         247 :                                      delay,
     367         247 :                                      log_manager_->getChild("Ordering"));
     368         247 :   log_->info("[Init] => init ordering gate - [{}]",
     369         247 :              logger::logBool(ordering_gate));
     370         247 : }
     371             : 
     372             : /**
     373             :  * Initializing iroha verified proposal creator and block creator
     374             :  */
     375             : void Irohad::initSimulator() {
     376         247 :   auto block_factory = std::make_unique<shared_model::proto::ProtoBlockFactory>(
     377             :       //  Block factory in simulator uses UnsignedBlockValidator because it is
     378             :       //  not required to check signatures of block here, as they will be
     379             :       //  checked when supermajority of peers will sign the block. It is also
     380             :       //  not required to validate signatures of transactions here because they
     381             :       //  are validated in the ordering gate, where they are received from the
     382             :       //  ordering service.
     383         247 :       std::make_unique<
     384             :           shared_model::validation::DefaultUnsignedBlockValidator>(),
     385         247 :       std::make_unique<shared_model::validation::ProtoBlockValidator>());
     386         247 :   simulator = std::make_shared<Simulator>(
     387         247 :       ordering_gate,
     388         247 :       stateful_validator,
     389         247 :       storage,
     390         247 :       storage,
     391         247 :       crypto_signer_,
     392         247 :       std::move(block_factory),
     393         247 :       log_manager_->getChild("Simulator")->getLogger());
     394             : 
     395         247 :   log_->info("[Init] => init simulator");
     396         247 : }
     397             : 
     398             : /**
     399             :  * Initializing consensus block cache
     400             :  */
     401             : void Irohad::initConsensusCache() {
     402         247 :   consensus_result_cache_ = std::make_shared<consensus::ConsensusResultCache>();
     403             : 
     404         247 :   log_->info("[Init] => init consensus block cache");
     405         247 : }
     406             : 
     407             : /**
     408             :  * Initializing block loader
     409             :  */
     410             : void Irohad::initBlockLoader() {
     411         247 :   block_loader =
     412         247 :       loader_init.initBlockLoader(storage,
     413         247 :                                   storage,
     414         247 :                                   consensus_result_cache_,
     415         247 :                                   log_manager_->getChild("BlockLoader"));
     416             : 
     417         247 :   log_->info("[Init] => block loader");
     418         247 : }
     419             : 
     420             : /**
     421             :  * Initializing consensus gate
     422             :  */
     423             : void Irohad::initConsensusGate() {
     424         247 :   consensus_gate =
     425         247 :       yac_init.initConsensusGate(storage,
     426         247 :                                  simulator,
     427         247 :                                  block_loader,
     428         247 :                                  keypair,
     429         247 :                                  consensus_result_cache_,
     430         247 :                                  vote_delay_,
     431         247 :                                  async_call_,
     432         247 :                                  common_objects_factory_,
     433             :                                  kConsensusConsistencyModel,
     434         247 :                                  log_manager_->getChild("Consensus"));
     435         247 :   consensus_gate->onOutcome().subscribe(
     436         247 :       consensus_gate_events_subscription,
     437         247 :       consensus_gate_objects.get_subscriber());
     438         247 :   log_->info("[Init] => consensus gate");
     439         247 : }
     440             : 
     441             : /**
     442             :  * Initializing synchronizer
     443             :  */
     444             : void Irohad::initSynchronizer() {
     445         247 :   synchronizer = std::make_shared<SynchronizerImpl>(
     446         247 :       consensus_gate,
     447         247 :       chain_validator,
     448         247 :       storage,
     449         247 :       storage,
     450         247 :       block_loader,
     451         247 :       log_manager_->getChild("Synchronizer")->getLogger());
     452             : 
     453         247 :   log_->info("[Init] => synchronizer");
     454         247 : }
     455             : 
     456             : /**
     457             :  * Initializing peer communication service
     458             :  */
     459             : void Irohad::initPeerCommunicationService() {
     460         247 :   pcs = std::make_shared<PeerCommunicationServiceImpl>(
     461         247 :       ordering_gate,
     462         247 :       synchronizer,
     463         247 :       simulator,
     464         247 :       log_manager_->getChild("PeerCommunicationService")->getLogger());
     465             : 
     466             :   pcs->onProposal().subscribe([this](const auto &) {
     467        3373 :     log_->info("~~~~~~~~~| PROPOSAL ^_^ |~~~~~~~~~ ");
     468        3373 :   });
     469             : 
     470             :   pcs->on_commit().subscribe([this](const auto &event) {
     471             :     using iroha::synchronizer::SynchronizationOutcomeType;
     472        3126 :     switch (event.sync_outcome) {
     473             :       case SynchronizationOutcomeType::kCommit:
     474         717 :         log_->info(R"(~~~~~~~~~| COMMIT =^._.^= |~~~~~~~~~ )");
     475         717 :         break;
     476             :       case SynchronizationOutcomeType::kReject:
     477           0 :         log_->info(R"(~~~~~~~~~| REJECT \(*.*)/ |~~~~~~~~~ )");
     478           0 :         break;
     479             :       case SynchronizationOutcomeType::kNothing:
     480        2409 :         log_->info(R"(~~~~~~~~~| EMPTY (-_-)zzz |~~~~~~~~~ )");
     481        2409 :         break;
     482             :       default:
     483           0 :         break;
     484             :     }
     485        3126 :   });
     486             : 
     487         247 :   log_->info("[Init] => pcs");
     488         247 : }
     489             : 
     490             : void Irohad::initStatusBus() {
     491         247 :   status_bus_ = std::make_shared<StatusBusImpl>();
     492         247 :   log_->info("[Init] => Tx status bus");
     493         247 : }
     494             : 
     495             : void Irohad::initMstProcessor() {
     496             :   auto mst_logger_manager =
     497         247 :       log_manager_->getChild("MultiSignatureTransactions");
     498         247 :   auto mst_state_logger = mst_logger_manager->getChild("State")->getLogger();
     499         247 :   auto mst_completer = std::make_shared<DefaultCompleter>(mst_expiration_time_);
     500         247 :   auto mst_storage = std::make_shared<MstStorageStateImpl>(
     501             :       mst_completer,
     502             :       mst_state_logger,
     503         247 :       mst_logger_manager->getChild("Storage")->getLogger());
     504         247 :   std::shared_ptr<iroha::PropagationStrategy> mst_propagation;
     505         247 :   if (is_mst_supported_) {
     506           7 :     mst_transport = std::make_shared<iroha::network::MstTransportGrpc>(
     507           7 :         async_call_,
     508           7 :         transaction_factory,
     509           7 :         batch_parser,
     510           7 :         transaction_batch_factory_,
     511           7 :         persistent_cache,
     512             :         mst_completer,
     513           7 :         keypair.publicKey(),
     514           7 :         std::move(mst_state_logger),
     515           7 :         mst_logger_manager->getChild("Transport")->getLogger());
     516           7 :     mst_propagation = std::make_shared<GossipPropagationStrategy>(
     517           7 :         storage, rxcpp::observe_on_new_thread(), *opt_mst_gossip_params_);
     518           7 :   } else {
     519         240 :     mst_transport = std::make_shared<iroha::network::MstTransportStub>();
     520         240 :     mst_propagation = std::make_shared<iroha::PropagationStrategyStub>();
     521             :   }
     522             : 
     523         247 :   auto mst_time = std::make_shared<MstTimeProviderImpl>();
     524         247 :   auto fair_mst_processor = std::make_shared<FairMstProcessor>(
     525         247 :       mst_transport,
     526             :       mst_storage,
     527             :       mst_propagation,
     528             :       mst_time,
     529         247 :       mst_logger_manager->getChild("Processor")->getLogger());
     530         247 :   mst_processor = fair_mst_processor;
     531         247 :   mst_transport->subscribe(fair_mst_processor);
     532         247 :   log_->info("[Init] => MST processor");
     533         247 : }
     534             : 
     535             : void Irohad::initPendingTxsStorage() {
     536         247 :   pending_txs_storage_ = std::make_shared<PendingTransactionStorageImpl>(
     537         247 :       mst_processor->onStateUpdate(),
     538         247 :       mst_processor->onPreparedBatches(),
     539         247 :       mst_processor->onExpiredBatches());
     540         247 :   log_->info("[Init] => pending transactions storage");
     541         247 : }
     542             : 
     543             : /**
     544             :  * Initializing transaction command service
     545             :  */
     546             : void Irohad::initTransactionCommandService() {
     547         247 :   auto command_service_log_manager = log_manager_->getChild("CommandService");
     548             :   auto status_factory =
     549         247 :       std::make_shared<shared_model::proto::ProtoTxStatusFactory>();
     550         247 :   auto cs_cache = std::make_shared<::torii::CommandServiceImpl::CacheType>();
     551         247 :   auto tx_processor = std::make_shared<TransactionProcessorImpl>(
     552         247 :       pcs,
     553         247 :       mst_processor,
     554         247 :       status_bus_,
     555             :       status_factory,
     556         247 :       command_service_log_manager->getChild("Processor")->getLogger());
     557         247 :   command_service = std::make_shared<::torii::CommandServiceImpl>(
     558             :       tx_processor,
     559         247 :       storage,
     560         247 :       status_bus_,
     561             :       status_factory,
     562             :       cs_cache,
     563         247 :       persistent_cache,
     564         247 :       command_service_log_manager->getLogger());
     565         247 :   command_service_transport =
     566         247 :       std::make_shared<::torii::CommandServiceTransportGrpc>(
     567         247 :           command_service,
     568         247 :           status_bus_,
     569             :           status_factory,
     570         247 :           transaction_factory,
     571         247 :           batch_parser,
     572         247 :           transaction_batch_factory_,
     573             :           consensus_gate_objects.get_observable().map([](const auto &) {
     574           0 :             return ::torii::CommandServiceTransportGrpc::ConsensusGateEvent{};
     575             :           }),
     576         247 :           stale_stream_max_rounds_,
     577         247 :           command_service_log_manager->getChild("Transport")->getLogger());
     578             : 
     579         247 :   log_->info("[Init] => command service");
     580         247 : }
     581             : 
     582             : /**
     583             :  * Initializing query command service
     584             :  */
     585             : void Irohad::initQueryService() {
     586         247 :   auto query_service_log_manager = log_manager_->getChild("QueryService");
     587         247 :   auto query_processor = std::make_shared<QueryProcessorImpl>(
     588         247 :       storage,
     589         247 :       storage,
     590         247 :       pending_txs_storage_,
     591         247 :       query_response_factory_,
     592         247 :       query_service_log_manager->getChild("Processor")->getLogger());
     593             : 
     594         247 :   query_service = std::make_shared<::torii::QueryService>(
     595         247 :       query_processor, query_factory, query_service_log_manager->getLogger());
     596             : 
     597         247 :   log_->info("[Init] => query service");
     598         247 : }
     599             : 
     600             : void Irohad::initWsvRestorer() {
     601         247 :   wsv_restorer_ = std::make_shared<iroha::ametsuchi::WsvRestorerImpl>();
     602         247 : }
     603             : 
     604             : /**
     605             :  * Run iroha daemon
     606             :  */
     607             : Irohad::RunResult Irohad::run() {
     608             :   using iroha::expected::operator|;
     609             :   using iroha::operator|;
     610             : 
     611             :   // Initializing torii server
     612         247 :   torii_server = std::make_unique<ServerRunner>(
     613         247 :       listen_ip_ + ":" + std::to_string(torii_port_),
     614         247 :       log_manager_->getChild("ToriiServerRunner")->getLogger(),
     615         247 :       false);
     616             : 
     617             :   // Initializing internal server
     618         247 :   internal_server = std::make_unique<ServerRunner>(
     619         247 :       listen_ip_ + ":" + std::to_string(internal_port_),
     620         247 :       log_manager_->getChild("InternalServerRunner")->getLogger(),
     621         247 :       false);
     622             : 
     623             :   // Run torii server
     624         247 :   return (torii_server->append(command_service_transport)
     625         247 :               .append(query_service)
     626         247 :               .run()
     627         247 :           |
     628             :           [&](const auto &port) {
     629         247 :             log_->info("Torii server bound on port {}", port);
     630         247 :             if (is_mst_supported_) {
     631           7 :               internal_server->append(
     632           7 :                   std::static_pointer_cast<MstTransportGrpc>(mst_transport));
     633           7 :             }
     634             :             // Run internal server
     635         247 :             return internal_server->append(ordering_init.service)
     636         247 :                 .append(yac_init.getConsensusNetwork())
     637         247 :                 .append(loader_init.service)
     638         247 :                 .run();
     639           0 :           })
     640         247 :       .match(
     641             :           [&](const auto &port) -> RunResult {
     642         247 :             log_->info("Internal server bound on port {}", port.value);
     643         247 :             log_->info("===> iroha initialized");
     644             :             // initiate first round
     645         247 :             auto block_query = storage->createBlockQuery();
     646         247 :             if (not block_query) {
     647           0 :               return expected::makeError("Failed to create block query");
     648             :             }
     649         247 :             auto block_var = (*block_query)->getTopBlock();
     650         247 :             if (auto e = boost::get<expected::Error<std::string>>(&block_var)) {
     651           0 :               return expected::makeError("Failed to get the top block: "
     652           0 :                                          + e->error);
     653             :             }
     654             : 
     655         247 :             auto block = boost::get<expected::Value<
     656         247 :                 std::shared_ptr<shared_model::interface::Block>>>(&block_var)
     657         247 :                              ->value;
     658             : 
     659         247 :             pcs->on_commit().subscribe(ordering_init.notifier.get_subscriber());
     660             : 
     661         247 :             ordering_init.notifier.get_subscriber().on_next(
     662         247 :                 synchronizer::SynchronizationEvent{
     663         247 :                     rxcpp::observable<>::just(block),
     664             :                     SynchronizationOutcomeType::kCommit,
     665         247 :                     {block->height(), ordering::kFirstRejectRound}});
     666         247 :             return {};
     667         247 :           },
     668             :           [&](const expected::Error<std::string> &e) -> RunResult {
     669           0 :             log_->error(e.error);
     670           0 :             return e;
     671             :           });
     672           0 : }

Generated by: LCOV version 1.13