LCOV - code coverage report
Current view: top level - irohad/synchronizer/impl - synchronizer_impl.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 99 108 91.7 %
Date: 2019-03-07 14:46:43 Functions: 17 18 94.4 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "synchronizer/impl/synchronizer_impl.hpp"
       7             : 
       8             : #include <utility>
       9             : 
      10             : #include "ametsuchi/block_query_factory.hpp"
      11             : #include "ametsuchi/mutable_storage.hpp"
      12             : #include "common/visitor.hpp"
      13             : #include "interfaces/iroha_internal/block.hpp"
      14             : #include "logger/logger.hpp"
      15             : 
      16             : namespace iroha {
      17             :   namespace synchronizer {
      18             : 
      19             :     SynchronizerImpl::SynchronizerImpl(
      20             :         std::shared_ptr<network::ConsensusGate> consensus_gate,
      21             :         std::shared_ptr<validation::ChainValidator> validator,
      22             :         std::shared_ptr<ametsuchi::MutableFactory> mutable_factory,
      23             :         std::shared_ptr<ametsuchi::BlockQueryFactory> block_query_factory,
      24             :         std::shared_ptr<network::BlockLoader> block_loader,
      25             :         logger::LoggerPtr log)
      26         260 :         : validator_(std::move(validator)),
      27         260 :           mutable_factory_(std::move(mutable_factory)),
      28         260 :           block_query_factory_(std::move(block_query_factory)),
      29         260 :           block_loader_(std::move(block_loader)),
      30         260 :           log_(std::move(log)) {
      31         260 :       consensus_gate->onOutcome().subscribe(
      32             :           subscription_, [this](consensus::GateObject object) {
      33        3143 :             this->processOutcome(object);
      34        3143 :           });
      35         260 :     }
      36             : 
      37             :     void SynchronizerImpl::processOutcome(consensus::GateObject object) {
      38        3143 :       log_->info("processing consensus outcome");
      39        3143 :       visit_in_place(
      40             :           object,
      41             :           [this](const consensus::PairValid &msg) { this->processNext(msg); },
      42             :           [this](const consensus::VoteOther &msg) {
      43           5 :             this->processDifferent(msg);
      44           5 :           },
      45             :           [this](const consensus::ProposalReject &msg) {
      46             :             // TODO: nickaleks IR-147 18.01.19 add peers
      47             :             // list from GateObject when it has one
      48           1 :             notifier_.get_subscriber().on_next(SynchronizationEvent{
      49           1 :                 rxcpp::observable<>::empty<
      50             :                     std::shared_ptr<shared_model::interface::Block>>(),
      51             :                 SynchronizationOutcomeType::kReject,
      52           1 :                 msg.round});
      53           1 :           },
      54             :           [this](const consensus::BlockReject &msg) {
      55             :             // TODO: nickaleks IR-147 18.01.19 add peers
      56             :             // list from GateObject when it has one
      57           1 :             notifier_.get_subscriber().on_next(SynchronizationEvent{
      58           1 :                 rxcpp::observable<>::empty<
      59             :                     std::shared_ptr<shared_model::interface::Block>>(),
      60             :                 SynchronizationOutcomeType::kReject,
      61           1 :                 msg.round});
      62           1 :           },
      63             :           [this](const consensus::AgreementOnNone &msg) {
      64             :             // TODO: nickaleks IR-147 18.01.19 add peers
      65             :             // list from GateObject when it has one
      66        2410 :             notifier_.get_subscriber().on_next(SynchronizationEvent{
      67        2410 :                 rxcpp::observable<>::empty<
      68             :                     std::shared_ptr<shared_model::interface::Block>>(),
      69             :                 SynchronizationOutcomeType::kNothing,
      70        2410 :                 msg.round});
      71        2410 :           });
      72        3143 :     }
      73             : 
      74             :     boost::optional<SynchronizationEvent>
      75             :     SynchronizerImpl::downloadMissingBlocks(
      76             :         const consensus::VoteOther &msg,
      77             :         std::unique_ptr<ametsuchi::MutableStorage> storage,
      78             :         const shared_model::interface::types::HeightType height) {
      79           5 :       auto expected_height = msg.round.block_round;
      80             : 
      81             :       // while blocks are not loaded and not committed
      82           5 :       while (true) {
      83             :         // TODO andrei 17.10.18 IR-1763 Add delay strategy for loading blocks
      84          15 :         for (const auto &public_key : msg.public_keys) {
      85             :           auto network_chain =
      86          10 :               block_loader_->retrieveBlocks(height, public_key);
      87             : 
      88          10 :           std::vector<std::shared_ptr<shared_model::interface::Block>> blocks;
      89          10 :           network_chain.as_blocking().subscribe(
      90             :               [&blocks](auto block) { blocks.push_back(block); });
      91          10 :           if (blocks.empty()) {
      92           1 :             log_->info("Downloaded an empty chain");
      93           1 :             continue;
      94             :           } else {
      95           9 :             log_->info("Successfully downloaded {} blocks", blocks.size());
      96             :           }
      97             : 
      98             :           auto chain =
      99           9 :               rxcpp::observable<>::iterate(blocks, rxcpp::identity_immediate());
     100             : 
     101           9 :           if (blocks.back()->height() >= expected_height
     102           9 :               and validator_->validateAndApply(chain, *storage)) {
     103           5 :             auto ledger_state = mutable_factory_->commit(std::move(storage));
     104             : 
     105           5 :             if (ledger_state) {
     106           3 :               return SynchronizationEvent{chain,
     107             :                                           SynchronizationOutcomeType::kCommit,
     108           3 :                                           msg.round,
     109           3 :                                           std::move(*ledger_state)};
     110             :             } else {
     111           2 :               return boost::none;
     112             :             }
     113           5 :           }
     114          14 :         }
     115             :       }
     116           5 :     }
     117             : 
     118             :     boost::optional<std::unique_ptr<ametsuchi::MutableStorage>>
     119             :     SynchronizerImpl::getStorage() {
     120          13 :       auto mutable_storage_var = mutable_factory_->createMutableStorage();
     121          13 :       if (auto e =
     122          13 :               boost::get<expected::Error<std::string>>(&mutable_storage_var)) {
     123           5 :         log_->error("could not create mutable storage: {}", e->error);
     124           5 :         return {};
     125             :       }
     126           8 :       return {std::move(
     127           8 :           boost::get<
     128             :               expected::Value<std::unique_ptr<ametsuchi::MutableStorage>>>(
     129           8 :               &mutable_storage_var)
     130           8 :               ->value)};
     131          13 :     }
     132             : 
     133             :     void SynchronizerImpl::processNext(const consensus::PairValid &msg) {
     134         726 :       log_->info("at handleNext");
     135         726 :       auto ledger_state = mutable_factory_->commitPrepared(*msg.block);
     136         726 :       if (ledger_state) {
     137         718 :         notifier_.get_subscriber().on_next(
     138         718 :             SynchronizationEvent{rxcpp::observable<>::just(msg.block),
     139             :                                  SynchronizationOutcomeType::kCommit,
     140         718 :                                  msg.round,
     141         718 :                                  std::move(*ledger_state)});
     142         718 :       } else {
     143           8 :         auto opt_storage = getStorage();
     144           8 :         if (opt_storage == boost::none) {
     145           5 :           return;
     146             :         }
     147             :         std::unique_ptr<ametsuchi::MutableStorage> storage =
     148           3 :             std::move(opt_storage.value());
     149           3 :         if (storage->apply(*msg.block)) {
     150           3 :           ledger_state = mutable_factory_->commit(std::move(storage));
     151           3 :           if (ledger_state) {
     152           1 :             notifier_.get_subscriber().on_next(
     153           1 :                 SynchronizationEvent{rxcpp::observable<>::just(msg.block),
     154             :                                      SynchronizationOutcomeType::kCommit,
     155           1 :                                      msg.round,
     156           1 :                                      std::move(*ledger_state)});
     157           1 :           } else {
     158           2 :             log_->error("failed to commit mutable storage");
     159             :           }
     160           3 :         } else {
     161           0 :           log_->warn("Block was not committed due to fail in mutable storage");
     162             :         }
     163          11 :       }
     164         726 :     }
     165             : 
     166             :     void SynchronizerImpl::processDifferent(const consensus::VoteOther &msg) {
     167           5 :       log_->info("at handleDifferent");
     168             : 
     169           5 :       shared_model::interface::types::HeightType top_block_height{0};
     170           5 :       if (auto block_query = block_query_factory_->createBlockQuery()) {
     171           5 :         top_block_height = (*block_query)->getTopBlockHeight();
     172           5 :       } else {
     173           0 :         log_->error(
     174           0 :             "Unable to create block query and retrieve top block height");
     175           0 :         return;
     176             :       }
     177             : 
     178           5 :       if (top_block_height >= msg.round.block_round) {
     179           0 :         log_->info(
     180           0 :             "Storage is already in synchronized state. Top block height is {}",
     181             :             top_block_height);
     182           0 :         return;
     183             :       }
     184             : 
     185           5 :       auto opt_storage = getStorage();
     186           5 :       if (opt_storage == boost::none) {
     187           0 :         return;
     188             :       }
     189             :       std::unique_ptr<ametsuchi::MutableStorage> storage =
     190           5 :           std::move(opt_storage.value());
     191             :       auto result =
     192           5 :           downloadMissingBlocks(msg, std::move(storage), top_block_height);
     193           5 :       if (result) {
     194           3 :         notifier_.get_subscriber().on_next(*result);
     195           3 :       }
     196           5 :     }
     197             : 
     198             :     rxcpp::observable<SynchronizationEvent>
     199             :     SynchronizerImpl::on_commit_chain() {
     200        1001 :       return notifier_.get_observable();
     201           0 :     }
     202             : 
     203             :     SynchronizerImpl::~SynchronizerImpl() {
     204         260 :       subscription_.unsubscribe();
     205         260 :     }
     206             : 
     207             :   }  // namespace synchronizer
     208             : }  // namespace iroha

Generated by: LCOV version 1.13