LCOV - code coverage report
Current view: top level - irohad/ordering/impl - on_demand_ordering_gate.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 78 81 96.3 %
Date: 2019-03-07 14:46:43 Functions: 22 25 88.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "ordering/impl/on_demand_ordering_gate.hpp"
       7             : 
       8             : #include <boost/range/adaptor/filtered.hpp>
       9             : #include <boost/range/adaptor/indexed.hpp>
      10             : #include <boost/range/adaptor/transformed.hpp>
      11             : #include <boost/range/empty.hpp>
      12             : #include "ametsuchi/tx_presence_cache.hpp"
      13             : #include "common/visitor.hpp"
      14             : #include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp"
      15             : #include "logger/logger.hpp"
      16             : #include "ordering/impl/on_demand_common.hpp"
      17             : 
      18             : using namespace iroha;
      19             : using namespace iroha::ordering;
      20             : 
      21             : OnDemandOrderingGate::OnDemandOrderingGate(
      22             :     std::shared_ptr<OnDemandOrderingService> ordering_service,
      23             :     std::shared_ptr<transport::OdOsNotification> network_client,
      24             :     rxcpp::observable<BlockRoundEventType> events,
      25             :     std::shared_ptr<cache::OrderingGateCache> cache,
      26             :     std::shared_ptr<shared_model::interface::UnsafeProposalFactory> factory,
      27             :     std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
      28             :     consensus::Round initial_round,
      29             :     logger::LoggerPtr log)
      30         256 :     : log_(std::move(log)),
      31         256 :       ordering_service_(std::move(ordering_service)),
      32         256 :       network_client_(std::move(network_client)),
      33             :       events_subscription_(events.subscribe([this](auto event) {
      34             :         // exclusive lock
      35        3381 :         std::unique_lock<std::shared_timed_mutex> lock(mutex_);
      36        3381 :         visit_in_place(event,
      37             :                        [this](const BlockEvent &block_event) {
      38             :                          // block committed, increment block round
      39         970 :                          log_->debug("BlockEvent. {}", block_event.round);
      40         970 :                          current_round_ = block_event.round;
      41         970 :                        },
      42             :                        [this](const EmptyEvent &empty_event) {
      43             :                          // no blocks committed, increment reject round
      44        2411 :                          log_->debug("EmptyEvent");
      45        2411 :                          current_round_ = empty_event.round;
      46        2411 :                        });
      47        3381 :         log_->debug("Current: {}", current_round_);
      48        3381 :         lock.unlock();
      49             : 
      50        3381 :         visit_in_place(event,
      51             :                        [this](const BlockEvent &block_event) {
      52             :                          // block committed, remove transactions from cache
      53         970 :                          cache_->remove(block_event.hashes);
      54         970 :                        },
      55             :                        [this](const EmptyEvent &) {
      56             :                          // no blocks committed, no transactions to remove
      57        2411 :                        });
      58             : 
      59        3381 :         auto batches = cache_->pop();
      60             : 
      61        3381 :         cache_->addToBack(batches);
      62        3381 :         if (not batches.empty()) {
      63         393 :           network_client_->onBatches(
      64         393 :               current_round_,
      65         393 :               transport::OdOsNotification::CollectionType{batches.begin(),
      66         393 :                                                           batches.end()});
      67         393 :         }
      68             : 
      69             :         // notify our ordering service about new round
      70        3381 :         ordering_service_->onCollaborationOutcome(current_round_);
      71             : 
      72             :         // request proposal for the current round
      73        3381 :         auto proposal = this->processProposalRequest(
      74        3381 :             network_client_->onRequestProposal(current_round_));
      75             :         // vote for the object received from the network
      76        3381 :         proposal_notifier_.get_subscriber().on_next(
      77        3381 :             network::OrderingEvent{std::move(proposal), current_round_});
      78        3381 :       })),
      79         256 :       cache_(std::move(cache)),
      80         256 :       proposal_factory_(std::move(factory)),
      81         256 :       tx_cache_(std::move(tx_cache)),
      82         256 :       current_round_(initial_round) {}
      83             : 
      84             : OnDemandOrderingGate::~OnDemandOrderingGate() {
      85         256 :   events_subscription_.unsubscribe();
      86         256 : }
      87             : 
      88             : void OnDemandOrderingGate::propagateBatch(
      89             :     std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
      90         725 :   cache_->addToBack({batch});
      91             : 
      92         725 :   std::shared_lock<std::shared_timed_mutex> lock(mutex_);
      93         725 :   network_client_->onBatches(
      94         725 :       current_round_, transport::OdOsNotification::CollectionType{batch});
      95         725 : }
      96             : 
      97             : rxcpp::observable<network::OrderingEvent> OnDemandOrderingGate::onProposal() {
      98         746 :   return proposal_notifier_.get_observable();
      99           0 : }
     100             : 
     101             : void OnDemandOrderingGate::setPcs(
     102             :     const iroha::network::PeerCommunicationService &pcs) {
     103           0 :   throw std::logic_error(
     104             :       "Method is deprecated. PCS observable should be set in ctor");
     105           0 : }
     106             : 
     107             : boost::optional<std::shared_ptr<const shared_model::interface::Proposal>>
     108             : OnDemandOrderingGate::processProposalRequest(
     109             :     boost::optional<
     110             :         std::shared_ptr<const OnDemandOrderingService::ProposalType>> proposal)
     111             :     const {
     112        3381 :   if (not proposal) {
     113        2061 :     return boost::none;
     114             :   }
     115        1320 :   auto proposal_without_replays = removeReplays(*std::move(proposal));
     116             :   // no need to check empty proposal
     117        1320 :   if (boost::empty(proposal_without_replays->transactions())) {
     118         597 :     return boost::none;
     119             :   }
     120         723 :   return proposal_without_replays;
     121        3381 : }
     122             : 
     123             : std::shared_ptr<const shared_model::interface::Proposal>
     124             : OnDemandOrderingGate::removeReplays(
     125             :     std::shared_ptr<const shared_model::interface::Proposal> proposal) const {
     126        1320 :   std::vector<bool> proposal_txs_validation_results;
     127             :   auto tx_is_not_processed = [this](const auto &tx) {
     128        1325 :     auto tx_result = tx_cache_->check(tx.hash());
     129        1325 :     if (not tx_result) {
     130             :       // TODO andrei 30.11.18 IR-51 Handle database error
     131          82 :       return false;
     132             :     }
     133        1243 :     return iroha::visit_in_place(
     134        1243 :         *tx_result,
     135             :         [](const ametsuchi::tx_cache_status_responses::Missing &) {
     136         728 :           return true;
     137             :         },
     138             :         [](const auto &status) {
     139             :           // TODO nickaleks 21.11.18: IR-1887 log replayed transactions
     140             :           // when log is added
     141         514 :           return false;
     142             :         });
     143        1325 :   };
     144             : 
     145        1320 :   shared_model::interface::TransactionBatchParserImpl batch_parser;
     146             : 
     147        1320 :   bool has_replays = false;
     148        1320 :   auto batches = batch_parser.parseBatches(proposal->transactions());
     149        2640 :   for (auto &batch : batches) {
     150        1320 :     bool all_txs_are_new =
     151        1320 :         std::all_of(batch.begin(), batch.end(), tx_is_not_processed);
     152        1320 :     proposal_txs_validation_results.insert(
     153        1320 :         proposal_txs_validation_results.end(), batch.size(), all_txs_are_new);
     154        1320 :     has_replays |= not all_txs_are_new;
     155             :   }
     156             : 
     157        1320 :   if (not has_replays) {
     158         723 :     return std::move(proposal);
     159             :   }
     160             : 
     161             :   auto unprocessed_txs =
     162         597 :       proposal->transactions() | boost::adaptors::indexed()
     163         597 :       | boost::adaptors::filtered(
     164             :             [proposal_txs_validation_results =
     165         597 :                  std::move(proposal_txs_validation_results)](const auto &el) {
     166         597 :               return proposal_txs_validation_results.at(el.index());
     167             :             })
     168         597 :       | boost::adaptors::transformed(
     169             :             [](const auto &el) -> decltype(auto) { return el.value(); });
     170             : 
     171         597 :   return proposal_factory_->unsafeCreateProposal(
     172         597 :       proposal->height(), proposal->createdTime(), unprocessed_txs);
     173        1320 : }

Generated by: LCOV version 1.13