Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #ifndef IROHA_ASYNC_GRPC_CLIENT_HPP
7 : #define IROHA_ASYNC_GRPC_CLIENT_HPP
8 :
9 : #include <ciso646>
10 : #include <thread>
11 :
12 : #include <google/protobuf/empty.pb.h>
13 : #include <grpc++/grpc++.h>
14 : #include <grpcpp/impl/codegen/async_unary_call.h>
15 : #include "logger/logger.hpp"
16 :
17 : namespace iroha {
18 : namespace network {
19 :
20 : /**
21 : * Asynchronous gRPC client which does no processing of server responses
22 : * @tparam Response type of server response
23 : */
24 : template <typename Response>
25 : class AsyncGrpcClient {
26 : public:
27 : explicit AsyncGrpcClient(logger::LoggerPtr log)
28 503 : : thread_(&AsyncGrpcClient::asyncCompleteRpc, this),
29 503 : log_(std::move(log)) {}
30 :
31 : /**
32 : * Listen to gRPC server responses
33 : */
34 : void asyncCompleteRpc() {
35 : void *got_tag;
36 503 : auto ok = false;
37 11409 : while (cq_.Next(&got_tag, &ok)) {
38 10906 : auto call = static_cast<AsyncClientCall *>(got_tag);
39 10906 : if (not call->status.ok()) {
40 85 : log_->warn("RPC failed: {}", call->status.error_message());
41 85 : }
42 10906 : delete call;
43 : }
44 503 : }
45 :
46 : ~AsyncGrpcClient() {
47 503 : cq_.Shutdown();
48 503 : if (thread_.joinable()) {
49 503 : thread_.join();
50 503 : }
51 503 : }
52 :
53 : grpc::CompletionQueue cq_;
54 : std::thread thread_;
55 :
56 : /**
57 : * State and data information of gRPC call
58 : */
59 : struct AsyncClientCall {
60 : Response reply;
61 :
62 : grpc::ClientContext context;
63 :
64 : grpc::Status status;
65 :
66 : std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<Response>>
67 : response_reader;
68 : };
69 :
70 : /**
71 : * Universal method to perform all needed sends
72 : * @tparam lambda which must return unique pointer to
73 : * ClientAsyncResponseReader<Response> object
74 : */
75 : template <typename F>
76 : void Call(F &&lambda) {
77 10907 : auto call = new AsyncClientCall;
78 10907 : call->response_reader = lambda(&call->context, &cq_);
79 10907 : call->response_reader->Finish(&call->reply, &call->status, call);
80 10907 : }
81 :
82 : private:
83 : logger::LoggerPtr log_;
84 : };
85 : } // namespace network
86 : } // namespace iroha
87 :
88 : #endif // IROHA_ASYNC_GRPC_CLIENT_HPP
|