LCOV - code coverage report
Current view: top level - irohad/ordering/impl - on_demand_ordering_service_impl.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 112 115 97.4 %
Date: 2019-03-07 14:46:43 Functions: 15 15 100.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "ordering/impl/on_demand_ordering_service_impl.hpp"
       7             : 
       8             : #include <unordered_set>
       9             : 
      10             : #include <boost/optional.hpp>
      11             : #include <boost/range/adaptor/filtered.hpp>
      12             : #include <boost/range/adaptor/indirected.hpp>
      13             : #include <boost/range/adaptor/transformed.hpp>
      14             : #include <boost/range/algorithm/for_each.hpp>
      15             : #include <boost/range/size.hpp>
      16             : #include "ametsuchi/tx_presence_cache.hpp"
      17             : #include "ametsuchi/tx_presence_cache_utils.hpp"
      18             : #include "common/visitor.hpp"
      19             : #include "datetime/time.hpp"
      20             : #include "interfaces/iroha_internal/proposal.hpp"
      21             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      22             : #include "interfaces/transaction.hpp"
      23             : #include "logger/logger.hpp"
      24             : 
      25             : using namespace iroha;
      26             : using namespace iroha::ordering;
      27             : using TransactionBatchType = transport::OdOsNotification::TransactionBatchType;
      28             : 
      29             : OnDemandOrderingServiceImpl::OnDemandOrderingServiceImpl(
      30             :     size_t transaction_limit,
      31             :     std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
      32             :         proposal_factory,
      33             :     std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
      34             :     logger::LoggerPtr log,
      35             :     size_t number_of_proposals,
      36             :     const consensus::Round &initial_round)
      37         257 :     : transaction_limit_(transaction_limit),
      38         257 :       number_of_proposals_(number_of_proposals),
      39         257 :       proposal_factory_(std::move(proposal_factory)),
      40         257 :       tx_cache_(std::move(tx_cache)),
      41         257 :       log_(std::move(log)) {
      42         257 :   onCollaborationOutcome(initial_round);
      43         257 : }
      44             : 
      45             : // -------------------------| OnDemandOrderingService |-------------------------
      46             : 
      47             : void OnDemandOrderingServiceImpl::onCollaborationOutcome(
      48             :     consensus::Round round) {
      49        3657 :   log_->info("onCollaborationOutcome => {}", round);
      50             :   // exclusive write lock
      51        3657 :   std::lock_guard<std::shared_timed_mutex> guard(lock_);
      52        3657 :   log_->debug("onCollaborationOutcome => write lock is acquired");
      53             : 
      54        3657 :   packNextProposals(round);
      55        3657 :   tryErase();
      56        3657 : }
      57             : 
      58             : // ----------------------------| OdOsNotification |-----------------------------
      59             : 
      60             : void OnDemandOrderingServiceImpl::onBatches(consensus::Round round,
      61             :                                             CollectionType batches) {
      62             :   // read lock
      63        2968 :   std::shared_lock<std::shared_timed_mutex> guard(lock_);
      64        2968 :   log_->info("onBatches => collection size = {}, {}", batches.size(), round);
      65             : 
      66             :   auto unprocessed_batches =
      67             :       boost::adaptors::filter(batches, [this](const auto &batch) {
      68        3409 :         log_->info("check batch {} for already processed transactions",
      69        3386 :                    batch->reducedHash().hex());
      70        3409 :         return not this->batchAlreadyProcessed(*batch);
      71           0 :       });
      72        3180 :   auto it = current_proposals_.find(round);
      73        3238 :   if (it == current_proposals_.end()) {
      74         952 :     it =
      75         952 :         std::find_if(current_proposals_.begin(),
      76         863 :                      current_proposals_.end(),
      77             :                      [&round](const auto &p) {
      78        1897 :                        auto request_reject_round = round.reject_round;
      79        1897 :                        auto reject_round = p.first.reject_round;
      80        1904 :                        return request_reject_round == reject_round
      81        1897 :                            or (request_reject_round >= 2 and reject_round >= 2);
      82             :                      });
      83         970 :     if (it == current_proposals_.end()) {
      84           0 :       log_->critical("No place to store the batches!");
      85           0 :       assert(false);  // terminate if in debug build
      86             :       return;
      87             :     }
      88         933 :     log_->debug("onBatches => collection will be inserted to {}", it->first);
      89         933 :   }
      90        3117 :   std::for_each(unprocessed_batches.begin(),
      91        3108 :                 unprocessed_batches.end(),
      92             :                 [&it](auto &obj) { it->second.push(std::move(obj)); });
      93        3262 :   log_->debug("onBatches => collection is inserted");
      94        3262 : }
      95             : 
      96             : boost::optional<
      97             :     std::shared_ptr<const OnDemandOrderingServiceImpl::ProposalType>>
      98             : OnDemandOrderingServiceImpl::onRequestProposal(consensus::Round round) {
      99             :   // read lock
     100        3110 :   std::shared_lock<std::shared_timed_mutex> guard(lock_);
     101        3110 :   auto proposal = proposal_map_.find(round);
     102             :   // space between '{}' and 'returning' is not missing, since either nothing, or
     103             :   // NOT with space is printed
     104        3110 :   log_->debug("onRequestProposal, {}, {}returning a proposal.",
     105             :               round,
     106        3110 :               (proposal == proposal_map_.end()) ? "NOT " : "");
     107        3110 :   if (proposal != proposal_map_.end()) {
     108        1333 :     return proposal->second;
     109             :   } else {
     110        1777 :     return boost::none;
     111             :   }
     112        3110 : }
     113             : 
     114             : // ---------------------------------| Private |---------------------------------
     115             : 
     116             : /**
     117             :  * Get transactions from the given batches queue. Does not break batches -
     118             :  * continues getting all the transactions from the ongoing batch until the
     119             :  * required amount is collected.
     120             :  * @param requested_tx_amount - amount of transactions to get
     121             :  * @param tx_batches_queue - the queue to get transactions from
     122             :  * @param discarded_txs_amount - the amount of discarded txs
     123             : 
     124             :  * @return transactions
     125             :  */
     126             : static std::vector<std::shared_ptr<shared_model::interface::Transaction>>
     127             : getTransactions(size_t requested_tx_amount,
     128             :                 tbb::concurrent_queue<TransactionBatchType> &tx_batches_queue,
     129             :                 boost::optional<size_t &> discarded_txs_amount) {
     130        2179 :   TransactionBatchType batch;
     131        2179 :   std::vector<std::shared_ptr<shared_model::interface::Transaction>> collection;
     132        2179 :   std::unordered_set<std::string> inserted;
     133             : 
     134        4378 :   while (collection.size() < requested_tx_amount
     135        4378 :          and tx_batches_queue.try_pop(batch)
     136        2223 :          and inserted.insert(batch->reducedHash().hex()).second) {
     137        2199 :     collection.insert(
     138        2199 :         std::end(collection),
     139        2199 :         std::make_move_iterator(std::begin(batch->transactions())),
     140        2199 :         std::make_move_iterator(std::end(batch->transactions())));
     141             :   }
     142             : 
     143        2179 :   if (discarded_txs_amount) {
     144        2179 :     *discarded_txs_amount = 0;
     145        2974 :     while (tx_batches_queue.try_pop(batch)) {
     146         795 :       *discarded_txs_amount += boost::size(batch->transactions());
     147             :     }
     148        2179 :   }
     149             : 
     150        2179 :   return collection;
     151        2179 : }
     152             : 
     153             : void OnDemandOrderingServiceImpl::packNextProposals(
     154             :     const consensus::Round &round) {
     155             :   auto close_round = [this](consensus::Round round) {
     156        4895 :     log_->debug("close {}", round);
     157             : 
     158        4895 :     auto it = current_proposals_.find(round);
     159        4895 :     if (it != current_proposals_.end()) {
     160        3889 :       log_->debug("proposal found");
     161        3889 :       if (not it->second.empty()) {
     162        2179 :         log_->debug("Mutable proposal generation for round {}", round);
     163             :         size_t discarded_txs_amount;
     164        2179 :         auto txs = getTransactions(transaction_limit_, it->second, discarded_txs_amount);
     165        2179 :         if (not txs.empty()) {
     166        2179 :           log_->debug("Number of transactions in proposal = {}", txs.size());
     167        2179 :           auto proposal = proposal_factory_->unsafeCreateProposal(
     168        2179 :               round.block_round,
     169        2179 :               iroha::time::now(),
     170        2179 :               std::move(txs) | boost::adaptors::indirected);
     171        2179 :           proposal_map_.emplace(round, std::move(proposal));
     172        2179 :           log_->debug(
     173        2179 :               "packNextProposal: data has been fetched for {}. "
     174             :               "Discarded {} transactions.",
     175             :               round,
     176             :               discarded_txs_amount);
     177        2179 :           round_queue_.push(round);
     178        2179 :         }
     179        2179 :       }
     180        3889 :       current_proposals_.erase(it);
     181        3889 :     }
     182        4895 :   };
     183             : 
     184             :   auto open_round = [this](consensus::Round round) {
     185        6133 :     log_->debug("open {}", round);
     186        6133 :     current_proposals_[round];
     187        6133 :   };
     188             : 
     189             :   /*
     190             :    * The possible cases can be visualised as a diagram, where:
     191             :    * o - current round, x - next round, v - target round
     192             :    *
     193             :    *   0 1 2
     194             :    * 0 o x v
     195             :    * 1 x v .
     196             :    * 2 v . .
     197             :    *
     198             :    * Reject case:
     199             :    *
     200             :    *   0 1 2 3
     201             :    * 0 . o x v
     202             :    * 1 x v . .
     203             :    * 2 v . . .
     204             :    *
     205             :    * (0,1) - current round. Round (0,2) is closed for transactions.
     206             :    * Round (0,3) is now receiving transactions.
     207             :    * Rounds (1,) and (2,) do not change.
     208             :    *
     209             :    * Commit case:
     210             :    *
     211             :    *   0 1 2
     212             :    * 0 . . .
     213             :    * 1 o x v
     214             :    * 2 x v .
     215             :    * 3 v . .
     216             :    *
     217             :    * (1,0) - current round. The diagram is similar to the initial case.
     218             :    */
     219             : 
     220             :   // close next reject round
     221        3657 :   close_round({round.block_round, round.reject_round + 1});
     222             : 
     223        3657 :   if (round.reject_round == kFirstRejectRound) {
     224             :     // new block round
     225        1238 :     close_round({round.block_round + 1, round.reject_round});
     226             : 
     227             :     // remove current queues
     228        1238 :     current_proposals_.clear();
     229             :     // initialize the 3 diagonal rounds from the commit case diagram
     230        1238 :     open_round({round.block_round + 1, kNextRejectRoundConsumer});
     231        1238 :     open_round({round.block_round + 2, kNextCommitRoundConsumer});
     232        1238 :   }
     233             : 
     234             :   // new reject round
     235        3657 :   open_round(
     236        3657 :       {round.block_round, currentRejectRoundConsumer(round.reject_round)});
     237        3657 : }
     238             : 
     239             : void OnDemandOrderingServiceImpl::tryErase() {
     240        5122 :   while (round_queue_.size() > number_of_proposals_) {
     241        1465 :     auto &round = round_queue_.front();
     242        1465 :     proposal_map_.erase(round);
     243        1465 :     log_->info("tryErase: erased {}", round);
     244        1465 :     round_queue_.pop();
     245             :   }
     246        3657 : }
     247             : 
     248             : bool OnDemandOrderingServiceImpl::batchAlreadyProcessed(
     249             :     const shared_model::interface::TransactionBatch &batch) {
     250        3290 :   auto tx_statuses = tx_cache_->check(batch);
     251        3290 :   if (not tx_statuses) {
     252             :     // TODO andrei 30.11.18 IR-51 Handle database error
     253           3 :     log_->warn("Check tx presence database error. Batch: {}", batch);
     254           3 :     return true;
     255             :   }
     256             :   // if any transaction is commited or rejected, batch was already processed
     257             :   // Note: any_of returns false for empty sequence
     258        3338 :   return std::any_of(
     259             :       tx_statuses->begin(), tx_statuses->end(), [this](const auto &tx_status) {
     260        3342 :         if (iroha::ametsuchi::isAlreadyProcessed(tx_status)) {
     261           2 :           log_->warn("Duplicate transaction: {}",
     262           2 :                      iroha::ametsuchi::getHash(tx_status).hex());
     263           2 :           return true;
     264             :         }
     265        3270 :         return false;
     266        3287 :       });
     267        3321 : }

Generated by: LCOV version 1.13