Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "torii/processor/query_processor_impl.hpp"
7 :
8 : #include <boost/range/size.hpp>
9 : #include "common/bind.hpp"
10 : #include "interfaces/queries/blocks_query.hpp"
11 : #include "interfaces/queries/query.hpp"
12 : #include "interfaces/query_responses/block_query_response.hpp"
13 : #include "interfaces/query_responses/block_response.hpp"
14 : #include "interfaces/query_responses/query_response.hpp"
15 : #include "logger/logger.hpp"
16 : #include "validation/utils.hpp"
17 :
18 : namespace iroha {
19 : namespace torii {
20 :
21 : QueryProcessorImpl::QueryProcessorImpl(
22 : std::shared_ptr<ametsuchi::Storage> storage,
23 : std::shared_ptr<ametsuchi::QueryExecutorFactory> qry_exec,
24 : std::shared_ptr<iroha::PendingTransactionStorage> pending_transactions,
25 : std::shared_ptr<shared_model::interface::QueryResponseFactory>
26 : response_factory,
27 : logger::LoggerPtr log)
28 262 : : storage_{std::move(storage)},
29 262 : qry_exec_{std::move(qry_exec)},
30 262 : pending_transactions_{std::move(pending_transactions)},
31 262 : response_factory_{std::move(response_factory)},
32 262 : log_{std::move(log)} {
33 262 : storage_->on_commit().subscribe(
34 : [this](std::shared_ptr<shared_model::interface::Block> block) {
35 : auto block_response =
36 727 : response_factory_->createBlockQueryResponse(clone(*block));
37 727 : blocks_query_subject_.get_subscriber().on_next(
38 727 : std::move(block_response));
39 727 : });
40 262 : }
41 :
42 : std::unique_ptr<shared_model::interface::QueryResponse>
43 : QueryProcessorImpl::queryHandle(const shared_model::interface::Query &qry) {
44 143 : auto executor = qry_exec_->createQueryExecutor(pending_transactions_,
45 140 : response_factory_);
46 143 : if (not executor) {
47 0 : log_->error("Cannot create query executor");
48 0 : return nullptr;
49 : }
50 :
51 143 : return executor.value()->validateAndExecute(qry, true);
52 143 : }
53 :
54 : rxcpp::observable<
55 : std::shared_ptr<shared_model::interface::BlockQueryResponse>>
56 : QueryProcessorImpl::blocksQueryHandle(
57 : const shared_model::interface::BlocksQuery &qry) {
58 2 : auto exec = qry_exec_->createQueryExecutor(pending_transactions_,
59 2 : response_factory_);
60 : if (not exec or not(exec | [&qry](const auto &executor) {
61 2 : return executor->validate(qry, true);
62 : })) {
63 1 : std::shared_ptr<shared_model::interface::BlockQueryResponse> response =
64 1 : response_factory_->createBlockQueryResponse("stateful invalid");
65 1 : return rxcpp::observable<>::just(std::move(response));
66 1 : }
67 1 : return blocks_query_subject_.get_observable();
68 2 : }
69 :
70 : } // namespace torii
71 : } // namespace iroha
|