Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "torii/processor/transaction_processor_impl.hpp"
7 :
8 : #include <boost/format.hpp>
9 :
10 : #include "interfaces/iroha_internal/block.hpp"
11 : #include "interfaces/iroha_internal/proposal.hpp"
12 : #include "interfaces/iroha_internal/transaction_batch.hpp"
13 : #include "interfaces/iroha_internal/transaction_sequence.hpp"
14 : #include "logger/logger.hpp"
15 : #include "validation/stateful_validator_common.hpp"
16 :
17 : namespace iroha {
18 : namespace torii {
19 :
20 : using network::PeerCommunicationService;
21 :
22 : namespace {
23 : std::string composeErrorMessage(
24 : const validation::TransactionError &tx_hash_and_error) {
25 61 : const auto tx_hash = tx_hash_and_error.tx_hash.hex();
26 61 : const auto &cmd_error = tx_hash_and_error.error;
27 61 : if (not cmd_error.tx_passed_initial_validation) {
28 6 : return (boost::format(
29 : "Stateful validation error: transaction %s "
30 : "did not pass initial verification: "
31 : "checking '%s', error code '%d', query arguments: %s")
32 6 : % tx_hash % cmd_error.name % cmd_error.error_code
33 6 : % cmd_error.error_extra)
34 6 : .str();
35 : }
36 55 : return (boost::format(
37 : "Stateful validation error in transaction %s: "
38 : "command '%s' with index '%d' did not pass "
39 : "verification with code '%d', query arguments: %s")
40 55 : % tx_hash % cmd_error.name % cmd_error.index
41 55 : % cmd_error.error_code % cmd_error.error_extra)
42 55 : .str();
43 61 : }
44 : } // namespace
45 :
46 : TransactionProcessorImpl::TransactionProcessorImpl(
47 : std::shared_ptr<PeerCommunicationService> pcs,
48 : std::shared_ptr<MstProcessor> mst_processor,
49 : std::shared_ptr<iroha::torii::StatusBus> status_bus,
50 : std::shared_ptr<shared_model::interface::TxStatusFactory>
51 : status_factory,
52 : logger::LoggerPtr log)
53 255 : : pcs_(std::move(pcs)),
54 255 : mst_processor_(std::move(mst_processor)),
55 255 : status_bus_(std::move(status_bus)),
56 255 : status_factory_(std::move(status_factory)),
57 255 : log_(std::move(log)) {
58 : // process stateful validation results
59 255 : pcs_->onVerifiedProposal().subscribe(
60 : [this](const simulator::VerifiedProposalCreatorEvent &event) {
61 3376 : if (not event.verified_proposal_result) {
62 2652 : return;
63 : }
64 :
65 724 : const auto &proposal_and_errors = getVerifiedProposalUnsafe(event);
66 :
67 : // notify about failed txs
68 724 : const auto &errors = proposal_and_errors->rejected_transactions;
69 785 : for (const auto &tx_error : errors) {
70 61 : log_->info(composeErrorMessage(tx_error));
71 61 : this->publishStatus(TxStatusType::kStatefulFailed,
72 61 : tx_error.tx_hash,
73 61 : tx_error.error);
74 : }
75 : // notify about success txs
76 1404 : for (const auto &successful_tx :
77 724 : proposal_and_errors->verified_proposal->transactions()) {
78 680 : log_->info("VerifiedProposalCreatorEvent StatefulValid: {}",
79 680 : successful_tx.hash().hex());
80 680 : this->publishStatus(TxStatusType::kStatefulValid,
81 680 : successful_tx.hash());
82 : }
83 3376 : });
84 :
85 : // commit transactions
86 255 : pcs_->on_commit().subscribe(
87 : [this](synchronizer::SynchronizationEvent sync_event) {
88 3128 : bool has_at_least_one_committed = false;
89 3128 : sync_event.synced_blocks.subscribe(
90 : // on next
91 : [this, &has_at_least_one_committed](auto model_block) {
92 1394 : for (const auto &tx : model_block->transactions()) {
93 675 : const auto &hash = tx.hash();
94 675 : log_->info("SynchronizationEvent Committed: {}",
95 675 : hash.hex());
96 675 : this->publishStatus(TxStatusType::kCommitted, hash);
97 675 : has_at_least_one_committed = true;
98 : }
99 776 : for (const auto &rejected_tx_hash :
100 719 : model_block->rejected_transactions_hashes()) {
101 57 : log_->info("SynchronizationEvent Rejected: {}",
102 57 : rejected_tx_hash.hex());
103 57 : this->publishStatus(TxStatusType::kRejected,
104 57 : rejected_tx_hash);
105 : }
106 719 : },
107 : // on complete
108 : [this, &has_at_least_one_committed] {
109 3128 : if (not has_at_least_one_committed) {
110 2462 : log_->info("there are no transactions to be committed");
111 2462 : }
112 3128 : });
113 3128 : });
114 :
115 : mst_processor_->onStateUpdate().subscribe([this](auto &&state) {
116 9 : log_->info("MST state updated");
117 18 : for (auto &&batch : state->getBatches()) {
118 18 : for (auto &&tx : batch->transactions()) {
119 9 : this->publishStatus(TxStatusType::kMstPending, tx->hash());
120 : }
121 : }
122 9 : });
123 : mst_processor_->onPreparedBatches().subscribe([this](auto &&batch) {
124 4 : log_->info("MST batch prepared");
125 4 : this->publishEnoughSignaturesStatus(batch->transactions());
126 4 : this->pcs_->propagate_batch(batch);
127 4 : });
128 : mst_processor_->onExpiredBatches().subscribe([this](auto &&batch) {
129 1 : log_->info("MST batch {} is expired", batch->reducedHash());
130 2 : for (auto &&tx : batch->transactions()) {
131 1 : this->publishStatus(TxStatusType::kMstExpired, tx->hash());
132 : }
133 1 : });
134 255 : }
135 :
136 : void TransactionProcessorImpl::batchHandle(
137 : std::shared_ptr<shared_model::interface::TransactionBatch>
138 : transaction_batch) const {
139 751 : log_->info("handle batch");
140 751 : if (transaction_batch->hasAllSignatures()
141 751 : and not mst_processor_->batchInStorage(transaction_batch)) {
142 737 : log_->info("propagating batch to PCS");
143 737 : this->publishEnoughSignaturesStatus(transaction_batch->transactions());
144 737 : pcs_->propagate_batch(transaction_batch);
145 737 : } else {
146 14 : log_->info("propagating batch to MST");
147 14 : mst_processor_->propagateBatch(transaction_batch);
148 : }
149 751 : }
150 :
151 : void TransactionProcessorImpl::publishStatus(
152 : TxStatusType tx_status,
153 : const shared_model::crypto::Hash &hash,
154 : const validation::CommandError &cmd_error) const {
155 2233 : auto tx_error = cmd_error.name.empty()
156 2173 : ? shared_model::interface::TxStatusFactory::TransactionError{}
157 60 : : shared_model::interface::TxStatusFactory::TransactionError{
158 60 : cmd_error.name, cmd_error.index, cmd_error.error_code};
159 2233 : switch (tx_status) {
160 : case TxStatusType::kStatelessFailed: {
161 0 : status_bus_->publish(
162 0 : status_factory_->makeStatelessFail(hash, tx_error));
163 0 : return;
164 : };
165 : case TxStatusType::kStatelessValid: {
166 0 : status_bus_->publish(
167 0 : status_factory_->makeStatelessValid(hash, tx_error));
168 0 : return;
169 : };
170 : case TxStatusType::kStatefulFailed: {
171 61 : status_bus_->publish(
172 61 : status_factory_->makeStatefulFail(hash, tx_error));
173 61 : return;
174 : };
175 : case TxStatusType::kStatefulValid: {
176 680 : status_bus_->publish(
177 680 : status_factory_->makeStatefulValid(hash, tx_error));
178 680 : return;
179 : };
180 : case TxStatusType::kRejected: {
181 57 : status_bus_->publish(status_factory_->makeRejected(hash, tx_error));
182 57 : return;
183 : };
184 : case TxStatusType::kCommitted: {
185 675 : status_bus_->publish(status_factory_->makeCommitted(hash, tx_error));
186 675 : return;
187 : };
188 : case TxStatusType::kMstExpired: {
189 1 : status_bus_->publish(status_factory_->makeMstExpired(hash, tx_error));
190 1 : return;
191 : };
192 : case TxStatusType::kNotReceived: {
193 0 : status_bus_->publish(
194 0 : status_factory_->makeNotReceived(hash, tx_error));
195 0 : return;
196 : };
197 : case TxStatusType::kMstPending: {
198 9 : status_bus_->publish(status_factory_->makeMstPending(hash, tx_error));
199 9 : return;
200 : };
201 : case TxStatusType::kEnoughSignaturesCollected: {
202 750 : status_bus_->publish(
203 750 : status_factory_->makeEnoughSignaturesCollected(hash, tx_error));
204 750 : return;
205 : };
206 : }
207 2233 : }
208 :
209 : void TransactionProcessorImpl::publishEnoughSignaturesStatus(
210 : const shared_model::interface::types::SharedTxsCollectionType &txs)
211 : const {
212 1491 : for (const auto &tx : txs) {
213 750 : this->publishStatus(TxStatusType::kEnoughSignaturesCollected,
214 750 : tx->hash());
215 : }
216 741 : }
217 : } // namespace torii
218 : } // namespace iroha
|