Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "common/default_constructible_unary_fn.hpp" // non-copyable value workaround
7 :
8 : #include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
9 :
10 : #include <boost/range/adaptor/filtered.hpp>
11 : #include <boost/range/adaptor/transformed.hpp>
12 : #include "ametsuchi/tx_presence_cache.hpp"
13 : #include "backend/protobuf/transaction.hpp"
14 : #include "interfaces/iroha_internal/transaction_batch.hpp"
15 : #include "interfaces/transaction.hpp"
16 : #include "logger/logger.hpp"
17 : #include "validators/field_validator.hpp"
18 :
19 : using namespace iroha;
20 : using namespace iroha::network;
21 :
22 : using iroha::ConstRefState;
23 :
24 : void sendStateAsyncImpl(const shared_model::interface::Peer &to,
25 : ConstRefState state,
26 : const std::string &sender_key,
27 : AsyncGrpcClient<google::protobuf::Empty> &async_call);
28 :
29 : MstTransportGrpc::MstTransportGrpc(
30 : std::shared_ptr<AsyncGrpcClient<google::protobuf::Empty>> async_call,
31 : std::shared_ptr<TransportFactoryType> transaction_factory,
32 : std::shared_ptr<shared_model::interface::TransactionBatchParser>
33 : batch_parser,
34 : std::shared_ptr<shared_model::interface::TransactionBatchFactory>
35 : transaction_batch_factory,
36 : std::shared_ptr<iroha::ametsuchi::TxPresenceCache> tx_presence_cache,
37 : std::shared_ptr<Completer> mst_completer,
38 : shared_model::crypto::PublicKey my_key,
39 : logger::LoggerPtr mst_state_logger,
40 : logger::LoggerPtr log)
41 10 : : async_call_(std::move(async_call)),
42 10 : transaction_factory_(std::move(transaction_factory)),
43 10 : batch_parser_(std::move(batch_parser)),
44 10 : batch_factory_(std::move(transaction_batch_factory)),
45 10 : tx_presence_cache_(std::move(tx_presence_cache)),
46 10 : mst_completer_(std::move(mst_completer)),
47 10 : my_key_(shared_model::crypto::toBinaryString(my_key)),
48 10 : mst_state_logger_(std::move(mst_state_logger)),
49 10 : log_(std::move(log)) {}
50 :
51 : shared_model::interface::types::SharedTxsCollectionType
52 : MstTransportGrpc::deserializeTransactions(const transport::MstState *request) {
53 6 : return boost::copy_range<
54 : shared_model::interface::types::SharedTxsCollectionType>(
55 6 : request->transactions()
56 6 : | boost::adaptors::transformed(
57 : [&](const auto &tx) { return transaction_factory_->build(tx); })
58 : | boost::adaptors::filtered([&](const auto &result) {
59 10 : return result.match(
60 : [](const iroha::expected::Value<
61 : std::unique_ptr<shared_model::interface::Transaction>> &) {
62 9 : return true;
63 : },
64 : [&](const iroha::expected::Error<TransportFactoryType::Error>
65 : &error) {
66 1 : log_->info("Transaction deserialization failed: hash {}, {}",
67 1 : error.error.hash,
68 1 : error.error.error);
69 1 : return false;
70 0 : });
71 : })
72 : | boost::adaptors::transformed([&](auto result) {
73 9 : return std::move(
74 9 : boost::get<iroha::expected::ValueOf<decltype(result)>>(
75 9 : result))
76 9 : .value;
77 : }));
78 0 : }
79 :
80 : grpc::Status MstTransportGrpc::SendState(
81 : ::grpc::ServerContext *context,
82 : const ::iroha::network::transport::MstState *request,
83 : ::google::protobuf::Empty *response) {
84 5 : log_->info("MstState Received");
85 :
86 5 : auto transactions = deserializeTransactions(request);
87 :
88 5 : auto batches = batch_parser_->parseBatches(transactions);
89 :
90 6 : MstState new_state = MstState::empty(mst_state_logger_, mst_completer_);
91 :
92 13 : for (auto &batch : batches) {
93 7 : batch_factory_->createTransactionBatch(batch).match(
94 : [&](iroha::expected::Value<std::unique_ptr<
95 : shared_model::interface::TransactionBatch>> &value) {
96 7 : auto cache_presence = tx_presence_cache_->check(*value.value);
97 7 : if (not cache_presence) {
98 : // TODO andrei 30.11.18 IR-51 Handle database error
99 1 : log_->warn("Check tx presence database error. Batch: {}",
100 1 : *value.value);
101 1 : return;
102 : }
103 6 : auto is_replay = std::any_of(
104 6 : cache_presence->begin(),
105 6 : cache_presence->end(),
106 : [](const auto &tx_status) {
107 7 : return iroha::visit_in_place(
108 7 : tx_status,
109 : [](const iroha::ametsuchi::tx_cache_status_responses::
110 6 : Missing &) { return false; },
111 : [](const auto &) { return true; });
112 : });
113 :
114 6 : if (not is_replay) {
115 5 : new_state += std::move(value).value;
116 5 : }
117 7 : },
118 : [&](iroha::expected::Error<std::string> &error) {
119 0 : log_->warn("Batch deserialization failed: {}", error.error);
120 0 : });
121 : }
122 :
123 6 : log_->info("batches in MstState: {}", new_state.getBatches().size());
124 :
125 6 : shared_model::crypto::PublicKey source_key(request->source_peer_key());
126 : auto key_invalid_reason =
127 6 : shared_model::validation::validatePubkey(source_key);
128 6 : if (key_invalid_reason) {
129 1 : log_->info("Dropping received MST State due to invalid public key: {}",
130 1 : *key_invalid_reason);
131 1 : return grpc::Status::OK;
132 : }
133 :
134 5 : if (new_state.isEmpty()) {
135 2 : log_->info(
136 2 : "All transactions from received MST state have been processed already, "
137 : "nothing to propagate to MST processor");
138 2 : return grpc::Status::OK;
139 : }
140 :
141 3 : if (auto subscriber = subscriber_.lock()) {
142 3 : subscriber->onNewState(source_key, std::move(new_state));
143 3 : } else {
144 0 : log_->warn("No subscriber for MST SendState event is set");
145 : }
146 :
147 3 : return grpc::Status::OK;
148 6 : }
149 :
150 : void MstTransportGrpc::subscribe(
151 : std::shared_ptr<MstTransportNotification> notification) {
152 10 : subscriber_ = notification;
153 10 : }
154 :
155 : void MstTransportGrpc::sendState(const shared_model::interface::Peer &to,
156 : ConstRefState providing_state) {
157 3 : log_->info("Propagate MstState to peer {}", to.address());
158 3 : sendStateAsyncImpl(to, providing_state, my_key_, *async_call_);
159 3 : }
160 :
161 : void iroha::network::sendStateAsync(
162 : const shared_model::interface::Peer &to,
163 : ConstRefState state,
164 : const shared_model::crypto::PublicKey &sender_key,
165 : AsyncGrpcClient<google::protobuf::Empty> &async_call) {
166 0 : sendStateAsyncImpl(
167 0 : to, state, shared_model::crypto::toBinaryString(sender_key), async_call);
168 0 : }
169 :
170 : void sendStateAsyncImpl(const shared_model::interface::Peer &to,
171 : ConstRefState state,
172 : const std::string &sender_key,
173 : AsyncGrpcClient<google::protobuf::Empty> &async_call) {
174 3 : std::unique_ptr<transport::MstTransportGrpc::StubInterface> client =
175 3 : transport::MstTransportGrpc::NewStub(grpc::CreateChannel(
176 3 : to.address(), grpc::InsecureChannelCredentials()));
177 :
178 3 : transport::MstState protoState;
179 3 : protoState.set_source_peer_key(sender_key);
180 8 : for (auto &batch : state.getBatches()) {
181 10 : for (auto &tx : batch->transactions()) {
182 : // TODO (@l4l) 04/03/18 simplify with IR-1040
183 5 : *protoState.add_transactions() =
184 5 : std::static_pointer_cast<shared_model::proto::Transaction>(tx)
185 5 : ->getTransport();
186 : }
187 : }
188 :
189 : async_call.Call([&](auto context, auto cq) {
190 3 : return client->AsyncSendState(context, protoState, cq);
191 : });
192 3 : }
|