Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "torii/impl/command_service_impl.hpp"
7 :
8 : #include <thread>
9 :
10 : #include "ametsuchi/block_query.hpp"
11 : #include "common/byteutils.hpp"
12 : #include "common/is_any.hpp"
13 : #include "common/visitor.hpp"
14 : #include "interfaces/iroha_internal/transaction_batch.hpp"
15 : #include "interfaces/transaction_responses/not_received_tx_response.hpp"
16 : #include "logger/logger.hpp"
17 :
18 : namespace iroha {
19 : namespace torii {
20 :
21 : CommandServiceImpl::CommandServiceImpl(
22 : std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
23 : std::shared_ptr<iroha::ametsuchi::Storage> storage,
24 : std::shared_ptr<iroha::torii::StatusBus> status_bus,
25 : std::shared_ptr<shared_model::interface::TxStatusFactory>
26 : status_factory,
27 : std::shared_ptr<iroha::torii::CommandServiceImpl::CacheType> cache,
28 : std::shared_ptr<iroha::ametsuchi::TxPresenceCache> tx_presence_cache,
29 : logger::LoggerPtr log)
30 250 : : tx_processor_(std::move(tx_processor)),
31 250 : storage_(std::move(storage)),
32 250 : status_bus_(std::move(status_bus)),
33 250 : cache_(std::move(cache)),
34 250 : status_factory_(std::move(status_factory)),
35 250 : tx_presence_cache_(std::move(tx_presence_cache)),
36 250 : log_(std::move(log)) {
37 : // Notifier for all clients
38 250 : status_subscription_ =
39 : status_bus_->statuses().subscribe([this](auto response) {
40 : // find response for this tx in cache; if status of received
41 : // response isn't "greater" than cached one, dismiss received one
42 2989 : auto tx_hash = response->transactionHash();
43 2989 : auto cached_tx_state = cache_->findItem(tx_hash);
44 2989 : if (cached_tx_state
45 2989 : and response->comparePriorities(**cached_tx_state)
46 2202 : != shared_model::interface::TransactionResponse::
47 : PrioritiesComparisonResult::kGreater) {
48 18 : return;
49 : }
50 2971 : cache_->addItem(tx_hash, response);
51 2989 : });
52 250 : }
53 :
54 : CommandServiceImpl::~CommandServiceImpl() {
55 250 : status_subscription_.unsubscribe();
56 250 : }
57 :
58 : void CommandServiceImpl::handleTransactionBatch(
59 : std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
60 736 : processBatch(batch);
61 736 : }
62 :
63 : std::shared_ptr<shared_model::interface::TransactionResponse>
64 : CommandServiceImpl::getStatus(const shared_model::crypto::Hash &request) {
65 782 : auto cached = cache_->findItem(request);
66 782 : if (cached) {
67 782 : return cached.value();
68 : }
69 :
70 0 : auto block_query = storage_->getBlockQuery();
71 0 : if (not block_query) {
72 : // TODO andrei 30.11.18 IR-51 Handle database error
73 0 : log_->warn("Could not create block query. Tx: {}", request.hex());
74 0 : return status_factory_->makeNotReceived(request);
75 : }
76 :
77 0 : auto status = block_query->checkTxPresence(request);
78 0 : if (not status) {
79 : // TODO andrei 30.11.18 IR-51 Handle database error
80 0 : log_->warn("Check tx presence database error. Tx: {}", request.hex());
81 0 : return status_factory_->makeNotReceived(request);
82 : }
83 :
84 0 : return iroha::visit_in_place(
85 0 : *status,
86 : [this, &request](
87 : const iroha::ametsuchi::tx_cache_status_responses::Missing &)
88 : -> std::shared_ptr<shared_model::interface::TransactionResponse> {
89 0 : log_->warn("Asked non-existing tx: {}", request.hex());
90 0 : return status_factory_->makeNotReceived(request);
91 0 : },
92 : [this, &request](const auto &) {
93 : std::shared_ptr<shared_model::interface::TransactionResponse>
94 0 : response = status_factory_->makeCommitted(request);
95 0 : cache_->addItem(request, response);
96 0 : return response;
97 0 : });
98 782 : }
99 :
100 : /**
101 : * Statuses considered final for streaming. Observable stops value emission
102 : * after receiving a value of one of the following types
103 : * @tparam T concrete response type
104 : *
105 : * StatefulFailedTxResponse and MstExpiredResponse were removed from the
106 : * list of final statuses.
107 : *
108 : * StatefulFailedTxResponse is not a final status because the node might be
109 : * in non-synchronized state and the transaction may be stateful valid from
110 : * the viewpoint of up to date nodes.
111 : *
112 : * MstExpiredResponse is not a final status in general case because it will
113 : * depend on MST expiration timeout. The transaction might expire in MST,
114 : * but remain valid in terms of Iroha validation rules. Thus, it may be
115 : * resent and committed successfully. As the result the final status may
116 : * differ from MstExpiredResponse.
117 : */
118 : template <typename T>
119 : constexpr bool FinalStatusValue =
120 : iroha::is_any<std::decay_t<T>,
121 : shared_model::interface::StatelessFailedTxResponse,
122 : shared_model::interface::CommittedTxResponse,
123 : shared_model::interface::RejectedTxResponse>::value;
124 :
125 : rxcpp::observable<
126 : std::shared_ptr<shared_model::interface::TransactionResponse>>
127 : CommandServiceImpl::getStatusStream(
128 : const shared_model::crypto::Hash &hash) {
129 : using ResponsePtrType =
130 : std::shared_ptr<shared_model::interface::TransactionResponse>;
131 : auto initial_status = cache_->findItem(hash).value_or([&] {
132 0 : log_->debug("tx is not received: {}", hash);
133 0 : return status_factory_->makeNotReceived(hash);
134 0 : }());
135 0 : return status_bus_
136 0 : ->statuses()
137 : // prepend initial status
138 0 : .start_with(initial_status)
139 : // select statuses with requested hash
140 : .filter([hash](auto response) {
141 0 : return response->transactionHash() == hash;
142 : })
143 : // successfully complete the observable if final status is received.
144 : // final status is included in the observable
145 0 : .template lift<ResponsePtrType>(
146 : [](rxcpp::subscriber<ResponsePtrType> dest) {
147 0 : return rxcpp::make_subscriber<ResponsePtrType>(
148 : dest, [=](ResponsePtrType response) {
149 0 : dest.on_next(response);
150 0 : iroha::visit_in_place(
151 0 : response->get(),
152 : [dest](const auto &resp)
153 : -> std::enable_if_t<
154 : FinalStatusValue<decltype(resp)>> {
155 0 : dest.on_completed();
156 0 : },
157 : [](const auto &resp)
158 : -> std::enable_if_t<
159 0 : not FinalStatusValue<decltype(resp)>>{});
160 0 : });
161 0 : });
162 0 : }
163 :
164 : void CommandServiceImpl::pushStatus(
165 : const std::string &who,
166 : std::shared_ptr<shared_model::interface::TransactionResponse>
167 : response) {
168 749 : log_->debug("{}: adding item to cache: {}", who, *response);
169 749 : status_bus_->publish(response);
170 749 : }
171 :
172 : void CommandServiceImpl::processBatch(
173 : std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
174 736 : const auto status_issuer = "ToriiBatchProcessor";
175 736 : const auto &txs = batch->transactions();
176 :
177 736 : bool cache_has_at_least_one_tx{false};
178 736 : bool batch_has_mst_pending_tx{false};
179 736 : std::tie(cache_has_at_least_one_tx, batch_has_mst_pending_tx) =
180 : // this accumulate can be split on two parts to perform visit_in_place
181 : // two times - one for cache lookup with booleans initialization and
182 : // another for statuses pushing. That can allow to move a part of code
183 : // to a separate method for simplification
184 736 : std::accumulate(
185 736 : txs.begin(),
186 736 : txs.end(),
187 736 : std::make_pair<bool, bool>(false, false),
188 : [this, &status_issuer](std::pair<bool, bool> lookup_result,
189 : const auto &tx) {
190 741 : const auto &tx_hash = tx->hash();
191 741 : if (auto found = cache_->findItem(tx_hash)) {
192 8 : iroha::visit_in_place(
193 8 : (*found)->get(),
194 : [this, &found, &lookup_result, &status_issuer](
195 : const shared_model::interface::MstPendingResponse &) {
196 8 : this->pushStatus(status_issuer, *found);
197 8 : lookup_result.second = true;
198 8 : },
199 : [this, &tx_hash, &status_issuer](
200 : const shared_model::interface::NotReceivedTxResponse
201 : &) {
202 : // This branch covers an impossible case (this cache
203 : // cannot contain NotReceivedTxResponse, because the tx
204 : // has reached processBatch method, which means that
205 : // the tx already has StatelessValid status).
206 : // That is why we are not updating its status inside
207 : // internal cache, but still pushing to status bus.
208 0 : this->pushStatus(
209 0 : status_issuer,
210 0 : status_factory_->makeStatelessValid(tx_hash));
211 0 : },
212 : [this, &found, &status_issuer](const auto &status) {
213 0 : this->pushStatus(status_issuer, *found);
214 0 : });
215 8 : lookup_result.first = true;
216 8 : }
217 741 : return lookup_result;
218 0 : });
219 :
220 736 : if (cache_has_at_least_one_tx and not batch_has_mst_pending_tx) {
221 : // If a non-persistent cache says that a transaction has pending status
222 : // that means we have to check persistent cache too.
223 : // Non-persistent cache might be overflowed and mst replay become
224 : // possible without checking persistent cache.
225 :
226 : // If there are no pending statuses and the transaction is found in
227 : // non-persistent cache, then it is considered as a replay and prevented
228 : // from further propagation.
229 :
230 : // If non-persistent cache does not contain any info about a
231 : // transaction, then we just check persistent cache.
232 0 : log_->warn("Replayed batch would not be served. {}", *batch);
233 0 : return;
234 : }
235 :
236 736 : auto cache_presence = tx_presence_cache_->check(*batch);
237 736 : if (not cache_presence) {
238 : // TODO andrei 30.11.18 IR-51 Handle database error
239 0 : log_->warn("Check tx presence database error. {}", *batch);
240 0 : return;
241 : }
242 736 : auto is_replay = std::any_of(
243 736 : cache_presence->begin(),
244 736 : cache_presence->end(),
245 : [this, &status_issuer](const auto &tx_status) {
246 741 : return iroha::visit_in_place(
247 741 : tx_status,
248 : [this, &status_issuer](
249 : const iroha::ametsuchi::tx_cache_status_responses::Missing
250 : &status) {
251 739 : this->pushStatus(
252 739 : status_issuer,
253 739 : status_factory_->makeStatelessValid(status.hash));
254 739 : return false;
255 0 : },
256 : [this, &status_issuer](
257 : const iroha::ametsuchi::tx_cache_status_responses::Committed
258 : &status) {
259 1 : this->pushStatus(status_issuer,
260 1 : status_factory_->makeCommitted(status.hash));
261 1 : return true;
262 0 : },
263 : [this, &status_issuer](
264 : const iroha::ametsuchi::tx_cache_status_responses::Rejected
265 : &status) {
266 1 : this->pushStatus(status_issuer,
267 1 : status_factory_->makeRejected(status.hash));
268 1 : return true;
269 0 : });
270 : });
271 736 : if (is_replay) {
272 2 : log_->warn("Replayed batch would not be served. {}", *batch);
273 2 : return;
274 : }
275 :
276 734 : tx_processor_->batchHandle(batch);
277 736 : }
278 :
279 : } // namespace torii
280 : } // namespace iroha
|