LCOV - code coverage report
Current view: top level - irohad/torii/impl - command_service_impl.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 70 119 58.8 %
Date: 2019-03-07 14:46:43 Functions: 15 60 25.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "torii/impl/command_service_impl.hpp"
       7             : 
       8             : #include <thread>
       9             : 
      10             : #include "ametsuchi/block_query.hpp"
      11             : #include "common/byteutils.hpp"
      12             : #include "common/is_any.hpp"
      13             : #include "common/visitor.hpp"
      14             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      15             : #include "interfaces/transaction_responses/not_received_tx_response.hpp"
      16             : #include "logger/logger.hpp"
      17             : 
      18             : namespace iroha {
      19             :   namespace torii {
      20             : 
      21             :     CommandServiceImpl::CommandServiceImpl(
      22             :         std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
      23             :         std::shared_ptr<iroha::ametsuchi::Storage> storage,
      24             :         std::shared_ptr<iroha::torii::StatusBus> status_bus,
      25             :         std::shared_ptr<shared_model::interface::TxStatusFactory>
      26             :             status_factory,
      27             :         std::shared_ptr<iroha::torii::CommandServiceImpl::CacheType> cache,
      28             :         std::shared_ptr<iroha::ametsuchi::TxPresenceCache> tx_presence_cache,
      29             :         logger::LoggerPtr log)
      30         250 :         : tx_processor_(std::move(tx_processor)),
      31         250 :           storage_(std::move(storage)),
      32         250 :           status_bus_(std::move(status_bus)),
      33         250 :           cache_(std::move(cache)),
      34         250 :           status_factory_(std::move(status_factory)),
      35         250 :           tx_presence_cache_(std::move(tx_presence_cache)),
      36         250 :           log_(std::move(log)) {
      37             :       // Notifier for all clients
      38         250 :       status_subscription_ =
      39             :           status_bus_->statuses().subscribe([this](auto response) {
      40             :             // find response for this tx in cache; if status of received
      41             :             // response isn't "greater" than cached one, dismiss received one
      42        2989 :             auto tx_hash = response->transactionHash();
      43        2989 :             auto cached_tx_state = cache_->findItem(tx_hash);
      44        2989 :             if (cached_tx_state
      45        2989 :                 and response->comparePriorities(**cached_tx_state)
      46        2202 :                     != shared_model::interface::TransactionResponse::
      47             :                            PrioritiesComparisonResult::kGreater) {
      48          18 :               return;
      49             :             }
      50        2971 :             cache_->addItem(tx_hash, response);
      51        2989 :           });
      52         250 :     }
      53             : 
      54             :     CommandServiceImpl::~CommandServiceImpl() {
      55         250 :       status_subscription_.unsubscribe();
      56         250 :     }
      57             : 
      58             :     void CommandServiceImpl::handleTransactionBatch(
      59             :         std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
      60         736 :       processBatch(batch);
      61         736 :     }
      62             : 
      63             :     std::shared_ptr<shared_model::interface::TransactionResponse>
      64             :     CommandServiceImpl::getStatus(const shared_model::crypto::Hash &request) {
      65         782 :       auto cached = cache_->findItem(request);
      66         782 :       if (cached) {
      67         782 :         return cached.value();
      68             :       }
      69             : 
      70           0 :       auto block_query = storage_->getBlockQuery();
      71           0 :       if (not block_query) {
      72             :         // TODO andrei 30.11.18 IR-51 Handle database error
      73           0 :         log_->warn("Could not create block query. Tx: {}", request.hex());
      74           0 :         return status_factory_->makeNotReceived(request);
      75             :       }
      76             : 
      77           0 :       auto status = block_query->checkTxPresence(request);
      78           0 :       if (not status) {
      79             :         // TODO andrei 30.11.18 IR-51 Handle database error
      80           0 :         log_->warn("Check tx presence database error. Tx: {}", request.hex());
      81           0 :         return status_factory_->makeNotReceived(request);
      82             :       }
      83             : 
      84           0 :       return iroha::visit_in_place(
      85           0 :           *status,
      86             :           [this, &request](
      87             :               const iroha::ametsuchi::tx_cache_status_responses::Missing &)
      88             :               -> std::shared_ptr<shared_model::interface::TransactionResponse> {
      89           0 :             log_->warn("Asked non-existing tx: {}", request.hex());
      90           0 :             return status_factory_->makeNotReceived(request);
      91           0 :           },
      92             :           [this, &request](const auto &) {
      93             :             std::shared_ptr<shared_model::interface::TransactionResponse>
      94           0 :                 response = status_factory_->makeCommitted(request);
      95           0 :             cache_->addItem(request, response);
      96           0 :             return response;
      97           0 :           });
      98         782 :     }
      99             : 
     100             :     /**
     101             :      * Statuses considered final for streaming. Observable stops value emission
     102             :      * after receiving a value of one of the following types
     103             :      * @tparam T concrete response type
     104             :      *
     105             :      * StatefulFailedTxResponse and MstExpiredResponse were removed from the
     106             :      * list of final statuses.
     107             :      *
     108             :      * StatefulFailedTxResponse is not a final status because the node might be
     109             :      * in non-synchronized state and the transaction may be stateful valid from
     110             :      * the viewpoint of up to date nodes.
     111             :      *
     112             :      * MstExpiredResponse is not a final status in general case because it will
     113             :      * depend on MST expiration timeout. The transaction might expire in MST,
     114             :      * but remain valid in terms of Iroha validation rules. Thus, it may be
     115             :      * resent and committed successfully. As the result the final status may
     116             :      * differ from MstExpiredResponse.
     117             :      */
     118             :     template <typename T>
     119             :     constexpr bool FinalStatusValue =
     120             :         iroha::is_any<std::decay_t<T>,
     121             :                       shared_model::interface::StatelessFailedTxResponse,
     122             :                       shared_model::interface::CommittedTxResponse,
     123             :                       shared_model::interface::RejectedTxResponse>::value;
     124             : 
     125             :     rxcpp::observable<
     126             :         std::shared_ptr<shared_model::interface::TransactionResponse>>
     127             :     CommandServiceImpl::getStatusStream(
     128             :         const shared_model::crypto::Hash &hash) {
     129             :       using ResponsePtrType =
     130             :           std::shared_ptr<shared_model::interface::TransactionResponse>;
     131             :       auto initial_status = cache_->findItem(hash).value_or([&] {
     132           0 :         log_->debug("tx is not received: {}", hash);
     133           0 :         return status_factory_->makeNotReceived(hash);
     134           0 :       }());
     135           0 :       return status_bus_
     136           0 :           ->statuses()
     137             :           // prepend initial status
     138           0 :           .start_with(initial_status)
     139             :           // select statuses with requested hash
     140             :           .filter([hash](auto response) {
     141           0 :             return response->transactionHash() == hash;
     142             :           })
     143             :           // successfully complete the observable if final status is received.
     144             :           // final status is included in the observable
     145           0 :           .template lift<ResponsePtrType>(
     146             :               [](rxcpp::subscriber<ResponsePtrType> dest) {
     147           0 :                 return rxcpp::make_subscriber<ResponsePtrType>(
     148             :                     dest, [=](ResponsePtrType response) {
     149           0 :                       dest.on_next(response);
     150           0 :                       iroha::visit_in_place(
     151           0 :                           response->get(),
     152             :                           [dest](const auto &resp)
     153             :                               -> std::enable_if_t<
     154             :                                   FinalStatusValue<decltype(resp)>> {
     155           0 :                             dest.on_completed();
     156           0 :                           },
     157             :                           [](const auto &resp)
     158             :                               -> std::enable_if_t<
     159           0 :                                   not FinalStatusValue<decltype(resp)>>{});
     160           0 :                     });
     161           0 :               });
     162           0 :     }
     163             : 
     164             :     void CommandServiceImpl::pushStatus(
     165             :         const std::string &who,
     166             :         std::shared_ptr<shared_model::interface::TransactionResponse>
     167             :             response) {
     168         749 :       log_->debug("{}: adding item to cache: {}", who, *response);
     169         749 :       status_bus_->publish(response);
     170         749 :     }
     171             : 
     172             :     void CommandServiceImpl::processBatch(
     173             :         std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
     174         736 :       const auto status_issuer = "ToriiBatchProcessor";
     175         736 :       const auto &txs = batch->transactions();
     176             : 
     177         736 :       bool cache_has_at_least_one_tx{false};
     178         736 :       bool batch_has_mst_pending_tx{false};
     179         736 :       std::tie(cache_has_at_least_one_tx, batch_has_mst_pending_tx) =
     180             :           // this accumulate can be split on two parts to perform visit_in_place
     181             :           // two times - one for cache lookup with booleans initialization and
     182             :           // another for statuses pushing. That can allow to move a part of code
     183             :           // to a separate method for simplification
     184         736 :           std::accumulate(
     185         736 :               txs.begin(),
     186         736 :               txs.end(),
     187         736 :               std::make_pair<bool, bool>(false, false),
     188             :               [this, &status_issuer](std::pair<bool, bool> lookup_result,
     189             :                                      const auto &tx) {
     190         741 :                 const auto &tx_hash = tx->hash();
     191         741 :                 if (auto found = cache_->findItem(tx_hash)) {
     192           8 :                   iroha::visit_in_place(
     193           8 :                       (*found)->get(),
     194             :                       [this, &found, &lookup_result, &status_issuer](
     195             :                           const shared_model::interface::MstPendingResponse &) {
     196           8 :                         this->pushStatus(status_issuer, *found);
     197           8 :                         lookup_result.second = true;
     198           8 :                       },
     199             :                       [this, &tx_hash, &status_issuer](
     200             :                           const shared_model::interface::NotReceivedTxResponse
     201             :                               &) {
     202             :                         // This branch covers an impossible case (this cache
     203             :                         // cannot contain NotReceivedTxResponse, because the tx
     204             :                         // has reached processBatch method, which means that
     205             :                         // the tx already has StatelessValid status).
     206             :                         // That is why we are not updating its status inside
     207             :                         // internal cache, but still pushing to status bus.
     208           0 :                         this->pushStatus(
     209           0 :                             status_issuer,
     210           0 :                             status_factory_->makeStatelessValid(tx_hash));
     211           0 :                       },
     212             :                       [this, &found, &status_issuer](const auto &status) {
     213           0 :                         this->pushStatus(status_issuer, *found);
     214           0 :                       });
     215           8 :                   lookup_result.first = true;
     216           8 :                 }
     217         741 :                 return lookup_result;
     218           0 :               });
     219             : 
     220         736 :       if (cache_has_at_least_one_tx and not batch_has_mst_pending_tx) {
     221             :         // If a non-persistent cache says that a transaction has pending status
     222             :         // that means we have to check persistent cache too.
     223             :         // Non-persistent cache might be overflowed and mst replay become
     224             :         // possible without checking persistent cache.
     225             : 
     226             :         // If there are no pending statuses and the transaction is found in
     227             :         // non-persistent cache, then it is considered as a replay and prevented
     228             :         // from further propagation.
     229             : 
     230             :         // If non-persistent cache does not contain any info about a
     231             :         // transaction, then we just check persistent cache.
     232           0 :         log_->warn("Replayed batch would not be served. {}", *batch);
     233           0 :         return;
     234             :       }
     235             : 
     236         736 :       auto cache_presence = tx_presence_cache_->check(*batch);
     237         736 :       if (not cache_presence) {
     238             :         // TODO andrei 30.11.18 IR-51 Handle database error
     239           0 :         log_->warn("Check tx presence database error. {}", *batch);
     240           0 :         return;
     241             :       }
     242         736 :       auto is_replay = std::any_of(
     243         736 :           cache_presence->begin(),
     244         736 :           cache_presence->end(),
     245             :           [this, &status_issuer](const auto &tx_status) {
     246         741 :             return iroha::visit_in_place(
     247         741 :                 tx_status,
     248             :                 [this, &status_issuer](
     249             :                     const iroha::ametsuchi::tx_cache_status_responses::Missing
     250             :                         &status) {
     251         739 :                   this->pushStatus(
     252         739 :                       status_issuer,
     253         739 :                       status_factory_->makeStatelessValid(status.hash));
     254         739 :                   return false;
     255           0 :                 },
     256             :                 [this, &status_issuer](
     257             :                     const iroha::ametsuchi::tx_cache_status_responses::Committed
     258             :                         &status) {
     259           1 :                   this->pushStatus(status_issuer,
     260           1 :                                    status_factory_->makeCommitted(status.hash));
     261           1 :                   return true;
     262           0 :                 },
     263             :                 [this, &status_issuer](
     264             :                     const iroha::ametsuchi::tx_cache_status_responses::Rejected
     265             :                         &status) {
     266           1 :                   this->pushStatus(status_issuer,
     267           1 :                                    status_factory_->makeRejected(status.hash));
     268           1 :                   return true;
     269           0 :                 });
     270             :           });
     271         736 :       if (is_replay) {
     272           2 :         log_->warn("Replayed batch would not be served. {}", *batch);
     273           2 :         return;
     274             :       }
     275             : 
     276         734 :       tx_processor_->batchHandle(batch);
     277         736 :     }
     278             : 
     279             :   }  // namespace torii
     280             : }  // namespace iroha

Generated by: LCOV version 1.13