LCOV - code coverage report
Current view: top level - irohad/multi_sig_transactions/impl - mst_processor_impl.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 61 64 95.3 %
Date: 2019-03-07 14:46:43 Functions: 18 19 94.7 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include <utility>
       7             : 
       8             : #include "logger/logger.hpp"
       9             : #include "multi_sig_transactions/mst_processor_impl.hpp"
      10             : 
      11             : namespace iroha {
      12             : 
      13             :   FairMstProcessor::FairMstProcessor(
      14             :       std::shared_ptr<iroha::network::MstTransport> transport,
      15             :       std::shared_ptr<MstStorage> storage,
      16             :       std::shared_ptr<PropagationStrategy> strategy,
      17             :       std::shared_ptr<MstTimeProvider> time_provider,
      18             :       logger::LoggerPtr log)
      19         255 :       : MstProcessor(log),  // use the same logger in base class
      20         255 :         transport_(std::move(transport)),
      21         255 :         storage_(std::move(storage)),
      22         255 :         strategy_(std::move(strategy)),
      23         255 :         time_provider_(std::move(time_provider)),
      24         255 :         propagation_subscriber_(strategy_->emitter().subscribe(
      25             :             [this](auto data) { this->onPropagate(data); })),
      26         255 :         log_(std::move(log)) {}
      27             : 
      28             :   FairMstProcessor::~FairMstProcessor() {
      29         255 :     propagation_subscriber_.unsubscribe();
      30         255 :   }
      31             : 
      32             :   // -------------------------| MstProcessor override |-------------------------
      33             : 
      34             :   auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch)
      35             :       -> decltype(propagateBatch(batch)) {
      36          23 :     auto state_update = storage_->updateOwnState(batch);
      37          23 :     completedBatchesNotify(*state_update.completed_state_);
      38          23 :     updatedBatchesNotify(*state_update.updated_state_);
      39          23 :     expiredBatchesNotify(
      40          23 :         storage_->getExpiredTransactions(time_provider_->getCurrentTime()));
      41          23 :   }
      42             : 
      43             :   auto FairMstProcessor::onStateUpdateImpl() const
      44             :       -> decltype(onStateUpdate()) {
      45         500 :     return state_subject_.get_observable();
      46           0 :   }
      47             : 
      48             :   auto FairMstProcessor::onPreparedBatchesImpl() const
      49             :       -> decltype(onPreparedBatches()) {
      50         500 :     return batches_subject_.get_observable();
      51           0 :   }
      52             : 
      53             :   auto FairMstProcessor::onExpiredBatchesImpl() const
      54             :       -> decltype(onExpiredBatches()) {
      55         500 :     return expired_subject_.get_observable();
      56           0 :   }
      57             : 
      58             :   // TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one
      59             :   void FairMstProcessor::completedBatchesNotify(ConstRefState state) const {
      60          24 :     if (not state.isEmpty()) {
      61           5 :       auto completed_batches = state.getBatches();
      62           5 :       std::for_each(completed_batches.begin(),
      63           5 :                     completed_batches.end(),
      64             :                     [this](const auto &batch) {
      65           5 :                       batches_subject_.get_subscriber().on_next(batch);
      66           5 :                     });
      67           5 :     }
      68          24 :   }
      69             : 
      70             :   void FairMstProcessor::updatedBatchesNotify(ConstRefState state) const {
      71          24 :     if (not state.isEmpty()) {
      72          18 :       state_subject_.get_subscriber().on_next(
      73          18 :           std::make_shared<MstState>(state));
      74          18 :     }
      75          24 :   }
      76             : 
      77             :   void FairMstProcessor::expiredBatchesNotify(ConstRefState state) const {
      78          24 :     if (not state.isEmpty()) {
      79           1 :       auto expired_batches = state.getBatches();
      80           1 :       std::for_each(expired_batches.begin(),
      81           1 :                     expired_batches.end(),
      82             :                     [this](const auto &batch) {
      83           1 :                       expired_subject_.get_subscriber().on_next(batch);
      84           1 :                     });
      85           1 :     }
      86          24 :   }
      87             : 
      88             :   bool FairMstProcessor::batchInStorageImpl(const DataType &batch) const {
      89         722 :     return storage_->batchInStorage(batch);
      90             :   }
      91             : 
      92             :   // -------------------| MstTransportNotification override |-------------------
      93             : 
      94             :   void FairMstProcessor::onNewState(const shared_model::crypto::PublicKey &from,
      95             :                                     ConstRefState new_state) {
      96           1 :     log_->info("Applying new state");
      97           1 :     auto current_time = time_provider_->getCurrentTime();
      98             : 
      99           1 :     auto state_update = storage_->apply(from, new_state);
     100             : 
     101             :     // updated batches
     102           1 :     updatedBatchesNotify(*state_update.updated_state_);
     103           1 :     log_->info("New batches size: {}",
     104           1 :                state_update.updated_state_->getBatches().size());
     105             : 
     106             :     // completed batches
     107           1 :     completedBatchesNotify(*state_update.completed_state_);
     108             : 
     109             :     // expired batches
     110           1 :     expiredBatchesNotify(storage_->getDiffState(from, current_time));
     111           1 :   }
     112             : 
     113             :   // -----------------------------| private api |-----------------------------
     114             : 
     115             :   void FairMstProcessor::onPropagate(
     116             :       const PropagationStrategy::PropagationData &data) {
     117          10 :     auto current_time = time_provider_->getCurrentTime();
     118          10 :     auto size = data.size();
     119          10 :     std::for_each(data.begin(),
     120          10 :                   data.end(),
     121             :                   [this, &current_time, size](const auto &dst_peer) {
     122          19 :                     auto diff = storage_->getDiffState(dst_peer->pubkey(),
     123          19 :                                                        current_time);
     124          19 :                     if (not diff.isEmpty()) {
     125           4 :                       log_->info("Propagate new data[{}]", size);
     126           4 :                       transport_->sendState(*dst_peer, diff);
     127           4 :                     }
     128          19 :                   });
     129          10 :   }
     130             : 
     131             : }  // namespace iroha

Generated by: LCOV version 1.13