LCOV - code coverage report
Current view: top level - irohad/torii/impl - query_service.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 51 65 78.5 %
Date: 2019-03-07 14:46:43 Functions: 11 12 91.7 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "torii/query_service.hpp"
       7             : 
       8             : #include "backend/protobuf/query_responses/proto_block_query_response.hpp"
       9             : #include "backend/protobuf/query_responses/proto_query_response.hpp"
      10             : #include "cryptography/default_hash_provider.hpp"
      11             : #include "interfaces/iroha_internal/abstract_transport_factory.hpp"
      12             : #include "logger/logger.hpp"
      13             : #include "validators/default_validator.hpp"
      14             : 
      15             : namespace iroha {
      16             :   namespace torii {
      17             : 
      18             :     QueryService::QueryService(
      19             :         std::shared_ptr<iroha::torii::QueryProcessor> query_processor,
      20             :         std::shared_ptr<QueryFactoryType> query_factory,
      21             :       logger::LoggerPtr log)
      22         262 :         : query_processor_{std::move(query_processor)},
      23         262 :           query_factory_{std::move(query_factory)},
      24         262 :           log_{std::move(log)} {}
      25             : 
      26             :     void QueryService::Find(iroha::protocol::Query const &request,
      27             :                             iroha::protocol::QueryResponse &response) {
      28         166 :       shared_model::crypto::Hash hash;
      29         166 :       auto blobPayload = shared_model::proto::makeBlob(request.payload());
      30         165 :       hash = shared_model::crypto::DefaultHashProvider::makeHash(blobPayload);
      31             : 
      32         166 :       if (cache_.findItem(hash)) {
      33             :         // Query was already processed
      34           1 :         response.mutable_error_response()->set_reason(
      35             :             iroha::protocol::ErrorResponse::STATELESS_INVALID);
      36           1 :         return;
      37             :       }
      38             : 
      39         165 :       query_factory_->build(request).match(
      40             :           [this, &hash, &response](
      41             :               const iroha::expected::Value<
      42             :                   std::unique_ptr<shared_model::interface::Query>> &query) {
      43             :             // Send query to iroha
      44         143 :             response = static_cast<shared_model::proto::QueryResponse &>(
      45         128 :                            *query_processor_->queryHandle(*query.value))
      46         143 :                            .getTransport();
      47             :             // TODO 18.02.2019 lebdron: IR-336 Replace cache
      48             :             // 0 is used as a dummy value
      49         143 :             cache_.addItem(hash, 0);
      50         143 :           },
      51             :           [&hash, &response](
      52             :               const iroha::expected::Error<QueryFactoryType::Error> &error) {
      53          22 :             response.set_query_hash(hash.hex());
      54          22 :             response.mutable_error_response()->set_reason(
      55             :                 iroha::protocol::ErrorResponse::STATELESS_INVALID);
      56          22 :             response.mutable_error_response()->set_message(
      57          22 :                 std::move(error.error.error));
      58          22 :           });
      59         166 :     }
      60             : 
      61             :     grpc::Status QueryService::Find(grpc::ServerContext *context,
      62             :                                     const iroha::protocol::Query *request,
      63             :                                     iroha::protocol::QueryResponse *response) {
      64         163 :       Find(*request, *response);
      65         163 :       return grpc::Status::OK;
      66             :     }
      67             : 
      68             :     grpc::Status QueryService::FetchCommits(
      69             :         grpc::ServerContext *context,
      70             :         const iroha::protocol::BlocksQuery *request,
      71             :         grpc::ServerWriter<iroha::protocol::BlockQueryResponse> *writer) {
      72           2 :       log_->debug("Fetching commits");
      73           2 :       shared_model::proto::TransportBuilder<
      74             :           shared_model::proto::BlocksQuery,
      75             :           shared_model::validation::DefaultSignedBlocksQueryValidator>()
      76           2 :           .build(*request)
      77           2 :           .match(
      78             :               [this, context, request, writer](
      79             :                   const iroha::expected::Value<shared_model::proto::BlocksQuery>
      80             :                       &query) {
      81           1 :                 rxcpp::composite_subscription sub;
      82           1 :                 query_processor_->blocksQueryHandle(query.value)
      83           1 :                     .as_blocking()
      84           1 :                     .subscribe(
      85             :                         sub,
      86             :                         [this, context, &sub, request, writer](
      87             :                             const std::shared_ptr<
      88             :                                 shared_model::interface::BlockQueryResponse>
      89             :                                 response) {
      90           1 :                           if (context->IsCancelled()) {
      91           0 :                             log_->debug("Unsubscribed");
      92           0 :                             sub.unsubscribe();
      93           0 :                           } else {
      94           1 :                             iroha::visit_in_place(
      95           1 :                                 response->get(),
      96             :                                 [this, writer, request](
      97             :                                     const shared_model::interface::BlockResponse
      98             :                                         &block_response) {
      99           1 :                                   log_->debug(
     100           1 :                                       "{} receives committed block",
     101           1 :                                       request->meta().creator_account_id());
     102           1 :                                   auto proto_block_response = static_cast<
     103             :                                       const shared_model::proto::BlockResponse
     104           1 :                                           &>(block_response);
     105           1 :                                   writer->Write(
     106           1 :                                       proto_block_response.getTransport());
     107           1 :                                 },
     108             :                                 [this, writer, request](
     109             :                                     const shared_model::interface::
     110             :                                         BlockErrorResponse
     111             :                                             &block_error_response) {
     112           0 :                                   log_->debug(
     113           0 :                                       "{} received error with message: {}",
     114           0 :                                       request->meta().creator_account_id(),
     115           0 :                                       block_error_response.message());
     116             :                                   auto proto_block_error_response =
     117           0 :                                       static_cast<const shared_model::proto::
     118             :                                                       BlockErrorResponse &>(
     119           0 :                                           block_error_response);
     120           0 :                                   writer->WriteLast(
     121           0 :                                       proto_block_error_response.getTransport(),
     122           0 :                                       grpc::WriteOptions());
     123           0 :                                 });
     124             :                           }
     125           1 :                         });
     126           1 :               },
     127             :               [this, writer](const auto &error) {
     128           1 :                 log_->debug("Stateless invalid: {}", error.error);
     129           1 :                 iroha::protocol::BlockQueryResponse response;
     130           1 :                 response.mutable_block_error_response()->set_message(
     131           1 :                     std::move(error.error));
     132           1 :                 writer->WriteLast(response, grpc::WriteOptions());
     133           1 :               });
     134             : 
     135           2 :       return grpc::Status::OK;
     136           0 :     }
     137             : 
     138             :   }  // namespace torii
     139             : }  // namespace iroha

Generated by: LCOV version 1.13