Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/on_demand_os_client_grpc.hpp"
7 :
8 : #include "backend/protobuf/proposal.hpp"
9 : #include "backend/protobuf/transaction.hpp"
10 : #include "interfaces/common_objects/peer.hpp"
11 : #include "interfaces/iroha_internal/transaction_batch.hpp"
12 : #include "logger/logger.hpp"
13 : #include "network/impl/grpc_channel_builder.hpp"
14 :
15 : using namespace iroha;
16 : using namespace iroha::ordering;
17 : using namespace iroha::ordering::transport;
18 :
19 : OnDemandOsClientGrpc::OnDemandOsClientGrpc(
20 : std::unique_ptr<proto::OnDemandOrdering::StubInterface> stub,
21 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
22 : async_call,
23 : std::shared_ptr<TransportFactoryType> proposal_factory,
24 : std::function<TimepointType()> time_provider,
25 : std::chrono::milliseconds proposal_request_timeout,
26 : logger::LoggerPtr log)
27 13495 : : log_(std::move(log)),
28 13495 : stub_(std::move(stub)),
29 13495 : async_call_(std::move(async_call)),
30 13495 : proposal_factory_(std::move(proposal_factory)),
31 13495 : time_provider_(std::move(time_provider)),
32 13495 : proposal_request_timeout_(proposal_request_timeout) {}
33 :
34 : void OnDemandOsClientGrpc::onBatches(consensus::Round round,
35 : CollectionType batches) {
36 3349 : proto::BatchesRequest request;
37 3349 : request.mutable_round()->set_block_round(round.block_round);
38 3349 : request.mutable_round()->set_reject_round(round.reject_round);
39 6698 : for (auto &batch : batches) {
40 6719 : for (auto &transaction : batch->transactions()) {
41 3370 : *request.add_transactions() = std::move(
42 3370 : static_cast<shared_model::proto::Transaction *>(transaction.get())
43 3370 : ->getTransport());
44 : }
45 : }
46 :
47 3349 : log_->debug("Propagating: '{}'", request.DebugString());
48 :
49 : async_call_->Call([&](auto context, auto cq) {
50 3349 : return stub_->AsyncSendBatches(context, request, cq);
51 : });
52 3349 : }
53 :
54 : boost::optional<std::shared_ptr<const OdOsNotification::ProposalType>>
55 : OnDemandOsClientGrpc::onRequestProposal(consensus::Round round) {
56 3375 : grpc::ClientContext context;
57 3375 : context.set_deadline(time_provider_() + proposal_request_timeout_);
58 3375 : proto::ProposalRequest request;
59 3375 : request.mutable_round()->set_block_round(round.block_round);
60 3375 : request.mutable_round()->set_reject_round(round.reject_round);
61 3375 : proto::ProposalResponse response;
62 3375 : auto status = stub_->RequestProposal(&context, request, &response);
63 3375 : if (not status.ok()) {
64 292 : log_->warn("RPC failed: {}", status.error_message());
65 292 : return boost::none;
66 : }
67 3083 : if (not response.has_proposal()) {
68 1765 : return boost::none;
69 : }
70 1318 : return proposal_factory_->build(response.proposal())
71 1318 : .match(
72 : [&](iroha::expected::Value<
73 : std::unique_ptr<shared_model::interface::Proposal>> &v) {
74 1318 : return boost::make_optional(
75 1318 : std::shared_ptr<const OdOsNotification::ProposalType>(
76 1318 : std::move(v).value));
77 0 : },
78 : [this](iroha::expected::Error<TransportFactoryType::Error> &error) {
79 0 : log_->info(error.error.error); // error
80 0 : return boost::optional<
81 : std::shared_ptr<const OdOsNotification::ProposalType>>();
82 : });
83 3375 : }
84 :
85 : OnDemandOsClientGrpcFactory::OnDemandOsClientGrpcFactory(
86 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
87 : async_call,
88 : std::shared_ptr<TransportFactoryType> proposal_factory,
89 : std::function<OnDemandOsClientGrpc::TimepointType()> time_provider,
90 : OnDemandOsClientGrpc::TimeoutType proposal_request_timeout,
91 : logger::LoggerPtr client_log)
92 247 : : async_call_(std::move(async_call)),
93 247 : proposal_factory_(std::move(proposal_factory)),
94 247 : time_provider_(time_provider),
95 247 : proposal_request_timeout_(proposal_request_timeout),
96 247 : client_log_(std::move(client_log)) {}
97 :
98 : std::unique_ptr<OdOsNotification> OnDemandOsClientGrpcFactory::create(
99 : const shared_model::interface::Peer &to) {
100 13492 : return std::make_unique<OnDemandOsClientGrpc>(
101 13492 : network::createClient<proto::OnDemandOrdering>(to.address()),
102 13492 : async_call_,
103 13492 : proposal_factory_,
104 13492 : time_provider_,
105 13492 : proposal_request_timeout_,
106 13492 : client_log_);
107 0 : }
|