LCOV - code coverage report
Current view: top level - irohad/torii/processor/impl - transaction_processor_impl.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 106 115 92.2 %
Date: 2019-03-07 14:46:43 Functions: 13 16 81.2 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "torii/processor/transaction_processor_impl.hpp"
       7             : 
       8             : #include <boost/format.hpp>
       9             : 
      10             : #include "interfaces/iroha_internal/block.hpp"
      11             : #include "interfaces/iroha_internal/proposal.hpp"
      12             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      13             : #include "interfaces/iroha_internal/transaction_sequence.hpp"
      14             : #include "logger/logger.hpp"
      15             : #include "validation/stateful_validator_common.hpp"
      16             : 
      17             : namespace iroha {
      18             :   namespace torii {
      19             : 
      20             :     using network::PeerCommunicationService;
      21             : 
      22             :     namespace {
      23             :       std::string composeErrorMessage(
      24             :           const validation::TransactionError &tx_hash_and_error) {
      25          61 :         const auto tx_hash = tx_hash_and_error.tx_hash.hex();
      26          61 :         const auto &cmd_error = tx_hash_and_error.error;
      27          61 :         if (not cmd_error.tx_passed_initial_validation) {
      28           6 :           return (boost::format(
      29             :                       "Stateful validation error: transaction %s "
      30             :                       "did not pass initial verification: "
      31             :                       "checking '%s', error code '%d', query arguments: %s")
      32           6 :                   % tx_hash % cmd_error.name % cmd_error.error_code
      33           6 :                   % cmd_error.error_extra)
      34           6 :               .str();
      35             :         }
      36          55 :         return (boost::format(
      37             :                     "Stateful validation error in transaction %s: "
      38             :                     "command '%s' with index '%d' did not pass "
      39             :                     "verification with code '%d', query arguments: %s")
      40          55 :                 % tx_hash % cmd_error.name % cmd_error.index
      41          55 :                 % cmd_error.error_code % cmd_error.error_extra)
      42          55 :             .str();
      43          61 :       }
      44             :     }  // namespace
      45             : 
      46             :     TransactionProcessorImpl::TransactionProcessorImpl(
      47             :         std::shared_ptr<PeerCommunicationService> pcs,
      48             :         std::shared_ptr<MstProcessor> mst_processor,
      49             :         std::shared_ptr<iroha::torii::StatusBus> status_bus,
      50             :         std::shared_ptr<shared_model::interface::TxStatusFactory>
      51             :             status_factory,
      52             :         logger::LoggerPtr log)
      53         255 :         : pcs_(std::move(pcs)),
      54         255 :           mst_processor_(std::move(mst_processor)),
      55         255 :           status_bus_(std::move(status_bus)),
      56         255 :           status_factory_(std::move(status_factory)),
      57         255 :           log_(std::move(log)) {
      58             :       // process stateful validation results
      59         255 :       pcs_->onVerifiedProposal().subscribe(
      60             :           [this](const simulator::VerifiedProposalCreatorEvent &event) {
      61        3376 :             if (not event.verified_proposal_result) {
      62        2652 :               return;
      63             :             }
      64             : 
      65         724 :             const auto &proposal_and_errors = getVerifiedProposalUnsafe(event);
      66             : 
      67             :             // notify about failed txs
      68         724 :             const auto &errors = proposal_and_errors->rejected_transactions;
      69         785 :             for (const auto &tx_error : errors) {
      70          61 :               log_->info(composeErrorMessage(tx_error));
      71          61 :               this->publishStatus(TxStatusType::kStatefulFailed,
      72          61 :                                   tx_error.tx_hash,
      73          61 :                                   tx_error.error);
      74             :             }
      75             :             // notify about success txs
      76        1404 :             for (const auto &successful_tx :
      77         724 :                  proposal_and_errors->verified_proposal->transactions()) {
      78         680 :               log_->info("VerifiedProposalCreatorEvent StatefulValid: {}",
      79         680 :                          successful_tx.hash().hex());
      80         680 :               this->publishStatus(TxStatusType::kStatefulValid,
      81         680 :                                   successful_tx.hash());
      82             :             }
      83        3376 :           });
      84             : 
      85             :       // commit transactions
      86         255 :       pcs_->on_commit().subscribe(
      87             :           [this](synchronizer::SynchronizationEvent sync_event) {
      88        3128 :             bool has_at_least_one_committed = false;
      89        3128 :             sync_event.synced_blocks.subscribe(
      90             :                 // on next
      91             :                 [this, &has_at_least_one_committed](auto model_block) {
      92        1394 :                   for (const auto &tx : model_block->transactions()) {
      93         675 :                     const auto &hash = tx.hash();
      94         675 :                     log_->info("SynchronizationEvent Committed: {}",
      95         675 :                                hash.hex());
      96         675 :                     this->publishStatus(TxStatusType::kCommitted, hash);
      97         675 :                     has_at_least_one_committed = true;
      98             :                   }
      99         776 :                   for (const auto &rejected_tx_hash :
     100         719 :                        model_block->rejected_transactions_hashes()) {
     101          57 :                     log_->info("SynchronizationEvent Rejected: {}",
     102          57 :                                rejected_tx_hash.hex());
     103          57 :                     this->publishStatus(TxStatusType::kRejected,
     104          57 :                                         rejected_tx_hash);
     105             :                   }
     106         719 :                 },
     107             :                 // on complete
     108             :                 [this, &has_at_least_one_committed] {
     109        3128 :                   if (not has_at_least_one_committed) {
     110        2462 :                     log_->info("there are no transactions to be committed");
     111        2462 :                   }
     112        3128 :                 });
     113        3128 :           });
     114             : 
     115             :       mst_processor_->onStateUpdate().subscribe([this](auto &&state) {
     116           9 :         log_->info("MST state updated");
     117          18 :         for (auto &&batch : state->getBatches()) {
     118          18 :           for (auto &&tx : batch->transactions()) {
     119           9 :             this->publishStatus(TxStatusType::kMstPending, tx->hash());
     120             :           }
     121             :         }
     122           9 :       });
     123             :       mst_processor_->onPreparedBatches().subscribe([this](auto &&batch) {
     124           4 :         log_->info("MST batch prepared");
     125           4 :         this->publishEnoughSignaturesStatus(batch->transactions());
     126           4 :         this->pcs_->propagate_batch(batch);
     127           4 :       });
     128             :       mst_processor_->onExpiredBatches().subscribe([this](auto &&batch) {
     129           1 :         log_->info("MST batch {} is expired", batch->reducedHash());
     130           2 :         for (auto &&tx : batch->transactions()) {
     131           1 :           this->publishStatus(TxStatusType::kMstExpired, tx->hash());
     132             :         }
     133           1 :       });
     134         255 :     }
     135             : 
     136             :     void TransactionProcessorImpl::batchHandle(
     137             :         std::shared_ptr<shared_model::interface::TransactionBatch>
     138             :             transaction_batch) const {
     139         751 :       log_->info("handle batch");
     140         751 :       if (transaction_batch->hasAllSignatures()
     141         751 :           and not mst_processor_->batchInStorage(transaction_batch)) {
     142         737 :         log_->info("propagating batch to PCS");
     143         737 :         this->publishEnoughSignaturesStatus(transaction_batch->transactions());
     144         737 :         pcs_->propagate_batch(transaction_batch);
     145         737 :       } else {
     146          14 :         log_->info("propagating batch to MST");
     147          14 :         mst_processor_->propagateBatch(transaction_batch);
     148             :       }
     149         751 :     }
     150             : 
     151             :     void TransactionProcessorImpl::publishStatus(
     152             :         TxStatusType tx_status,
     153             :         const shared_model::crypto::Hash &hash,
     154             :         const validation::CommandError &cmd_error) const {
     155        2233 :       auto tx_error = cmd_error.name.empty()
     156        2173 :           ? shared_model::interface::TxStatusFactory::TransactionError{}
     157          60 :           : shared_model::interface::TxStatusFactory::TransactionError{
     158          60 :                 cmd_error.name, cmd_error.index, cmd_error.error_code};
     159        2233 :       switch (tx_status) {
     160             :         case TxStatusType::kStatelessFailed: {
     161           0 :           status_bus_->publish(
     162           0 :               status_factory_->makeStatelessFail(hash, tx_error));
     163           0 :           return;
     164             :         };
     165             :         case TxStatusType::kStatelessValid: {
     166           0 :           status_bus_->publish(
     167           0 :               status_factory_->makeStatelessValid(hash, tx_error));
     168           0 :           return;
     169             :         };
     170             :         case TxStatusType::kStatefulFailed: {
     171          61 :           status_bus_->publish(
     172          61 :               status_factory_->makeStatefulFail(hash, tx_error));
     173          61 :           return;
     174             :         };
     175             :         case TxStatusType::kStatefulValid: {
     176         680 :           status_bus_->publish(
     177         680 :               status_factory_->makeStatefulValid(hash, tx_error));
     178         680 :           return;
     179             :         };
     180             :         case TxStatusType::kRejected: {
     181          57 :           status_bus_->publish(status_factory_->makeRejected(hash, tx_error));
     182          57 :           return;
     183             :         };
     184             :         case TxStatusType::kCommitted: {
     185         675 :           status_bus_->publish(status_factory_->makeCommitted(hash, tx_error));
     186         675 :           return;
     187             :         };
     188             :         case TxStatusType::kMstExpired: {
     189           1 :           status_bus_->publish(status_factory_->makeMstExpired(hash, tx_error));
     190           1 :           return;
     191             :         };
     192             :         case TxStatusType::kNotReceived: {
     193           0 :           status_bus_->publish(
     194           0 :               status_factory_->makeNotReceived(hash, tx_error));
     195           0 :           return;
     196             :         };
     197             :         case TxStatusType::kMstPending: {
     198           9 :           status_bus_->publish(status_factory_->makeMstPending(hash, tx_error));
     199           9 :           return;
     200             :         };
     201             :         case TxStatusType::kEnoughSignaturesCollected: {
     202         750 :           status_bus_->publish(
     203         750 :               status_factory_->makeEnoughSignaturesCollected(hash, tx_error));
     204         750 :           return;
     205             :         };
     206             :       }
     207        2233 :     }
     208             : 
     209             :     void TransactionProcessorImpl::publishEnoughSignaturesStatus(
     210             :         const shared_model::interface::types::SharedTxsCollectionType &txs)
     211             :         const {
     212        1491 :       for (const auto &tx : txs) {
     213         750 :         this->publishStatus(TxStatusType::kEnoughSignaturesCollected,
     214         750 :                             tx->hash());
     215             :       }
     216         741 :     }
     217             :   }  // namespace torii
     218             : }  // namespace iroha

Generated by: LCOV version 1.13