Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "common/default_constructible_unary_fn.hpp" // non-copyable value workaround
7 :
8 : #include "ordering/impl/on_demand_os_server_grpc.hpp"
9 :
10 : #include <boost/range/adaptor/filtered.hpp>
11 : #include <boost/range/adaptor/transformed.hpp>
12 : #include "backend/protobuf/proposal.hpp"
13 : #include "common/bind.hpp"
14 : #include "interfaces/iroha_internal/transaction_batch.hpp"
15 : #include "logger/logger.hpp"
16 :
17 : using namespace iroha::ordering;
18 : using namespace iroha::ordering::transport;
19 :
20 : OnDemandOsServerGrpc::OnDemandOsServerGrpc(
21 : std::shared_ptr<OdOsNotification> ordering_service,
22 : std::shared_ptr<TransportFactoryType> transaction_factory,
23 : std::shared_ptr<shared_model::interface::TransactionBatchParser>
24 : batch_parser,
25 : std::shared_ptr<shared_model::interface::TransactionBatchFactory>
26 : transaction_batch_factory,
27 : logger::LoggerPtr log)
28 250 : : ordering_service_(ordering_service),
29 250 : transaction_factory_(std::move(transaction_factory)),
30 250 : batch_parser_(std::move(batch_parser)),
31 250 : batch_factory_(std::move(transaction_batch_factory)),
32 250 : log_(std::move(log)) {}
33 :
34 : shared_model::interface::types::SharedTxsCollectionType
35 : OnDemandOsServerGrpc::deserializeTransactions(
36 : const proto::BatchesRequest *request) {
37 3298 : return boost::copy_range<
38 : shared_model::interface::types::SharedTxsCollectionType>(
39 3301 : request->transactions()
40 3301 : | boost::adaptors::transformed(
41 : [&](const auto &tx) { return transaction_factory_->build(tx); })
42 : | boost::adaptors::filtered([&](const auto &result) {
43 3328 : return result.match(
44 : [](const iroha::expected::Value<
45 : std::unique_ptr<shared_model::interface::Transaction>> &) {
46 3276 : return true;
47 : },
48 : [&](const iroha::expected::Error<TransportFactoryType::Error>
49 : &error) {
50 0 : log_->info("Transaction deserialization failed: hash {}, {}",
51 0 : error.error.hash,
52 0 : error.error.error);
53 0 : return false;
54 0 : });
55 : })
56 : | boost::adaptors::transformed([](auto result) {
57 3363 : return std::move(
58 3363 : boost::get<iroha::expected::ValueOf<decltype(result)>>(
59 3363 : result))
60 3363 : .value;
61 : }));
62 0 : }
63 :
64 : grpc::Status OnDemandOsServerGrpc::SendBatches(
65 : ::grpc::ServerContext *context,
66 : const proto::BatchesRequest *request,
67 : ::google::protobuf::Empty *response) {
68 1131 : consensus::Round round{request->round().block_round(),
69 1131 : request->round().reject_round()};
70 1131 : auto transactions = deserializeTransactions(request);
71 :
72 1131 : auto batch_candidates = batch_parser_->parseBatches(std::move(transactions));
73 :
74 2944 : auto batches = std::accumulate(
75 3240 : std::begin(batch_candidates),
76 3240 : std::end(batch_candidates),
77 2944 : OdOsNotification::CollectionType{},
78 : [this](auto &acc, const auto &cand) {
79 3239 : batch_factory_->createTransactionBatch(cand).match(
80 : [&](iroha::expected::Value<
81 : std::unique_ptr<shared_model::interface::TransactionBatch>>
82 3252 : &value) { acc.push_back(std::move(value).value); },
83 : [&](iroha::expected::Error<std::string> &error) {
84 0 : log_->warn("Batch deserialization failed: {}", error.error);
85 0 : });
86 3239 : return acc;
87 0 : });
88 :
89 3230 : ordering_service_->onBatches(round, std::move(batches));
90 :
91 3230 : return ::grpc::Status::OK;
92 3249 : }
93 :
94 : grpc::Status OnDemandOsServerGrpc::RequestProposal(
95 : ::grpc::ServerContext *context,
96 : const proto::ProposalRequest *request,
97 : proto::ProposalResponse *response) {
98 3083 : ordering_service_->onRequestProposal(
99 3083 : {request->round().block_round(), request->round().reject_round()})
100 : | [&](auto &&proposal) {
101 1318 : *response->mutable_proposal() = std::move(
102 1318 : static_cast<const shared_model::proto::Proposal *>(proposal.get())
103 1318 : ->getTransport());
104 1318 : };
105 3083 : return ::grpc::Status::OK;
106 0 : }
|