Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "torii/query_service.hpp"
7 :
8 : #include "backend/protobuf/query_responses/proto_block_query_response.hpp"
9 : #include "backend/protobuf/query_responses/proto_query_response.hpp"
10 : #include "cryptography/default_hash_provider.hpp"
11 : #include "interfaces/iroha_internal/abstract_transport_factory.hpp"
12 : #include "logger/logger.hpp"
13 : #include "validators/default_validator.hpp"
14 :
15 : namespace iroha {
16 : namespace torii {
17 :
18 : QueryService::QueryService(
19 : std::shared_ptr<iroha::torii::QueryProcessor> query_processor,
20 : std::shared_ptr<QueryFactoryType> query_factory,
21 : logger::LoggerPtr log)
22 262 : : query_processor_{std::move(query_processor)},
23 262 : query_factory_{std::move(query_factory)},
24 262 : log_{std::move(log)} {}
25 :
26 : void QueryService::Find(iroha::protocol::Query const &request,
27 : iroha::protocol::QueryResponse &response) {
28 166 : shared_model::crypto::Hash hash;
29 166 : auto blobPayload = shared_model::proto::makeBlob(request.payload());
30 165 : hash = shared_model::crypto::DefaultHashProvider::makeHash(blobPayload);
31 :
32 166 : if (cache_.findItem(hash)) {
33 : // Query was already processed
34 1 : response.mutable_error_response()->set_reason(
35 : iroha::protocol::ErrorResponse::STATELESS_INVALID);
36 1 : return;
37 : }
38 :
39 165 : query_factory_->build(request).match(
40 : [this, &hash, &response](
41 : const iroha::expected::Value<
42 : std::unique_ptr<shared_model::interface::Query>> &query) {
43 : // Send query to iroha
44 143 : response = static_cast<shared_model::proto::QueryResponse &>(
45 128 : *query_processor_->queryHandle(*query.value))
46 143 : .getTransport();
47 : // TODO 18.02.2019 lebdron: IR-336 Replace cache
48 : // 0 is used as a dummy value
49 143 : cache_.addItem(hash, 0);
50 143 : },
51 : [&hash, &response](
52 : const iroha::expected::Error<QueryFactoryType::Error> &error) {
53 22 : response.set_query_hash(hash.hex());
54 22 : response.mutable_error_response()->set_reason(
55 : iroha::protocol::ErrorResponse::STATELESS_INVALID);
56 22 : response.mutable_error_response()->set_message(
57 22 : std::move(error.error.error));
58 22 : });
59 166 : }
60 :
61 : grpc::Status QueryService::Find(grpc::ServerContext *context,
62 : const iroha::protocol::Query *request,
63 : iroha::protocol::QueryResponse *response) {
64 163 : Find(*request, *response);
65 163 : return grpc::Status::OK;
66 : }
67 :
68 : grpc::Status QueryService::FetchCommits(
69 : grpc::ServerContext *context,
70 : const iroha::protocol::BlocksQuery *request,
71 : grpc::ServerWriter<iroha::protocol::BlockQueryResponse> *writer) {
72 2 : log_->debug("Fetching commits");
73 2 : shared_model::proto::TransportBuilder<
74 : shared_model::proto::BlocksQuery,
75 : shared_model::validation::DefaultSignedBlocksQueryValidator>()
76 2 : .build(*request)
77 2 : .match(
78 : [this, context, request, writer](
79 : const iroha::expected::Value<shared_model::proto::BlocksQuery>
80 : &query) {
81 1 : rxcpp::composite_subscription sub;
82 1 : query_processor_->blocksQueryHandle(query.value)
83 1 : .as_blocking()
84 1 : .subscribe(
85 : sub,
86 : [this, context, &sub, request, writer](
87 : const std::shared_ptr<
88 : shared_model::interface::BlockQueryResponse>
89 : response) {
90 1 : if (context->IsCancelled()) {
91 0 : log_->debug("Unsubscribed");
92 0 : sub.unsubscribe();
93 0 : } else {
94 1 : iroha::visit_in_place(
95 1 : response->get(),
96 : [this, writer, request](
97 : const shared_model::interface::BlockResponse
98 : &block_response) {
99 1 : log_->debug(
100 1 : "{} receives committed block",
101 1 : request->meta().creator_account_id());
102 1 : auto proto_block_response = static_cast<
103 : const shared_model::proto::BlockResponse
104 1 : &>(block_response);
105 1 : writer->Write(
106 1 : proto_block_response.getTransport());
107 1 : },
108 : [this, writer, request](
109 : const shared_model::interface::
110 : BlockErrorResponse
111 : &block_error_response) {
112 0 : log_->debug(
113 0 : "{} received error with message: {}",
114 0 : request->meta().creator_account_id(),
115 0 : block_error_response.message());
116 : auto proto_block_error_response =
117 0 : static_cast<const shared_model::proto::
118 : BlockErrorResponse &>(
119 0 : block_error_response);
120 0 : writer->WriteLast(
121 0 : proto_block_error_response.getTransport(),
122 0 : grpc::WriteOptions());
123 0 : });
124 : }
125 1 : });
126 1 : },
127 : [this, writer](const auto &error) {
128 1 : log_->debug("Stateless invalid: {}", error.error);
129 1 : iroha::protocol::BlockQueryResponse response;
130 1 : response.mutable_block_error_response()->set_message(
131 1 : std::move(error.error));
132 1 : writer->WriteLast(response, grpc::WriteOptions());
133 1 : });
134 :
135 2 : return grpc::Status::OK;
136 0 : }
137 :
138 : } // namespace torii
139 : } // namespace iroha
|