Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "common/default_constructible_unary_fn.hpp" // non-copyable value workaround
7 :
8 : #include "torii/impl/command_service_transport_grpc.hpp"
9 :
10 : #include <atomic>
11 : #include <condition_variable>
12 : #include <iterator>
13 :
14 : #include <boost/format.hpp>
15 : #include <boost/range/adaptor/filtered.hpp>
16 : #include <boost/range/adaptor/transformed.hpp>
17 : #include "backend/protobuf/transaction_responses/proto_tx_response.hpp"
18 : #include "common/combine_latest_until_first_completed.hpp"
19 : #include "interfaces/iroha_internal/transaction_batch.hpp"
20 : #include "interfaces/iroha_internal/transaction_batch_factory.hpp"
21 : #include "interfaces/iroha_internal/transaction_batch_parser.hpp"
22 : #include "interfaces/iroha_internal/tx_status_factory.hpp"
23 : #include "interfaces/transaction.hpp"
24 : #include "logger/logger.hpp"
25 : #include "torii/status_bus.hpp"
26 :
27 : namespace iroha {
28 : namespace torii {
29 :
30 : CommandServiceTransportGrpc::CommandServiceTransportGrpc(
31 : std::shared_ptr<CommandService> command_service,
32 : std::shared_ptr<iroha::torii::StatusBus> status_bus,
33 : std::shared_ptr<shared_model::interface::TxStatusFactory>
34 : status_factory,
35 : std::shared_ptr<TransportFactoryType> transaction_factory,
36 : std::shared_ptr<shared_model::interface::TransactionBatchParser>
37 : batch_parser,
38 : std::shared_ptr<shared_model::interface::TransactionBatchFactory>
39 : transaction_batch_factory,
40 : rxcpp::observable<ConsensusGateEvent> consensus_gate_objects,
41 : int maximum_rounds_without_update,
42 : logger::LoggerPtr log)
43 253 : : command_service_(std::move(command_service)),
44 253 : status_bus_(std::move(status_bus)),
45 253 : status_factory_(std::move(status_factory)),
46 253 : transaction_factory_(std::move(transaction_factory)),
47 253 : batch_parser_(std::move(batch_parser)),
48 253 : batch_factory_(std::move(transaction_batch_factory)),
49 253 : log_(std::move(log)),
50 253 : consensus_gate_objects_(std::move(consensus_gate_objects)),
51 253 : maximum_rounds_without_update_(maximum_rounds_without_update) {}
52 :
53 : grpc::Status CommandServiceTransportGrpc::Torii(
54 : grpc::ServerContext *context,
55 : const iroha::protocol::Transaction *request,
56 : google::protobuf::Empty *response) {
57 783 : iroha::protocol::TxList single_tx_list;
58 783 : *single_tx_list.add_transactions() = *request;
59 783 : return ListTorii(context, &single_tx_list, response);
60 783 : }
61 :
62 : namespace {
63 : /**
64 : * Form an error message, which is to be shared between all transactions,
65 : * if there are several of them, or individual message, if there's only
66 : * one
67 : * @param tx_hashes is non empty hash list to form error message from
68 : * @param error of those tx(s)
69 : * @return message
70 : */
71 : std::string formErrorMessage(
72 : const std::vector<shared_model::crypto::Hash> &tx_hashes,
73 : const std::string &error) {
74 1 : if (tx_hashes.size() == 1) {
75 1 : return (boost::format("Stateless invalid tx, error: %s, hash: %s")
76 1 : % error % tx_hashes[0].hex())
77 1 : .str();
78 : }
79 :
80 : std::string folded_hashes =
81 0 : std::accumulate(std::next(tx_hashes.begin()),
82 0 : tx_hashes.end(),
83 0 : tx_hashes[0].hex(),
84 : [](auto &&acc, const auto &h) -> std::string {
85 0 : return acc + ", " + h.hex();
86 0 : });
87 :
88 0 : return (boost::format(
89 : "Stateless invalid tx in transaction sequence, error: %s\n"
90 : "Hash list: [%s]")
91 0 : % error % folded_hashes)
92 0 : .str();
93 1 : }
94 : } // namespace
95 :
96 : shared_model::interface::types::SharedTxsCollectionType
97 : CommandServiceTransportGrpc::deserializeTransactions(
98 : const iroha::protocol::TxList *request) {
99 790 : shared_model::interface::types::SharedTxsCollectionType tx_collection;
100 1597 : for (const auto &tx : request->transactions()) {
101 807 : transaction_factory_->build(tx).match(
102 : [&tx_collection](
103 : iroha::expected::Value<
104 : std::unique_ptr<shared_model::interface::Transaction>> &v) {
105 748 : tx_collection.emplace_back(std::move(v).value);
106 748 : },
107 : [this](iroha::expected::Error<TransportFactoryType::Error> &error) {
108 59 : status_bus_->publish(status_factory_->makeStatelessFail(
109 59 : error.error.hash,
110 59 : shared_model::interface::TxStatusFactory::TransactionError{
111 59 : error.error.error, 0, 0}));
112 59 : });
113 : }
114 790 : return tx_collection;
115 790 : }
116 :
117 : grpc::Status CommandServiceTransportGrpc::ListTorii(
118 : grpc::ServerContext *context,
119 : const iroha::protocol::TxList *request,
120 : google::protobuf::Empty *response) {
121 790 : auto transactions = deserializeTransactions(request);
122 :
123 790 : auto batches = batch_parser_->parseBatches(transactions);
124 :
125 1533 : for (auto &batch : batches) {
126 743 : batch_factory_->createTransactionBatch(batch).match(
127 : [&](iroha::expected::Value<std::unique_ptr<
128 : shared_model::interface::TransactionBatch>> &value) {
129 742 : this->command_service_->handleTransactionBatch(
130 742 : std::move(value).value);
131 742 : },
132 : [&](iroha::expected::Error<std::string> &error) {
133 1 : std::vector<shared_model::crypto::Hash> hashes;
134 :
135 743 : std::transform(batch.begin(),
136 1 : batch.end(),
137 1 : std::back_inserter(hashes),
138 : [](const auto &tx) { return tx->hash(); });
139 :
140 1 : auto error_msg = formErrorMessage(hashes, error.error);
141 : // set error response for each transaction in a batch candidate
142 1 : std::for_each(
143 : hashes.begin(), hashes.end(), [this, &error_msg](auto &hash) {
144 1 : status_bus_->publish(status_factory_->makeStatelessFail(
145 1 : hash,
146 1 : shared_model::interface::TxStatusFactory::
147 1 : TransactionError{error_msg, 0, 0}));
148 1 : });
149 1 : });
150 : }
151 :
152 790 : return grpc::Status::OK;
153 790 : }
154 :
155 : grpc::Status CommandServiceTransportGrpc::Status(
156 : grpc::ServerContext *context,
157 : const iroha::protocol::TxStatusRequest *request,
158 : iroha::protocol::ToriiResponse *response) {
159 783 : *response =
160 783 : std::static_pointer_cast<shared_model::proto::TransactionResponse>(
161 783 : command_service_->getStatus(
162 783 : shared_model::crypto::Hash::fromHexString(
163 783 : request->tx_hash())))
164 783 : ->getTransport();
165 783 : return grpc::Status::OK;
166 0 : }
167 :
168 : namespace {
169 : void handleEvents(rxcpp::composite_subscription &subscription,
170 : rxcpp::schedulers::run_loop &run_loop) {
171 2 : std::condition_variable wait_cv;
172 :
173 2 : run_loop.set_notify_earlier_wakeup(
174 : [&wait_cv](const auto &) { wait_cv.notify_one(); });
175 :
176 2 : std::mutex wait_mutex;
177 2 : std::unique_lock<std::mutex> lock(wait_mutex);
178 4 : while (subscription.is_subscribed() or not run_loop.empty()) {
179 13 : while (not run_loop.empty()
180 13 : and run_loop.peek().when <= run_loop.now()) {
181 11 : run_loop.dispatch();
182 : }
183 :
184 2 : if (run_loop.empty()) {
185 : wait_cv.wait(lock, [&run_loop, &subscription]() {
186 2 : return not subscription.is_subscribed() or not run_loop.empty();
187 : });
188 2 : } else {
189 0 : wait_cv.wait_until(lock, run_loop.peek().when);
190 : }
191 : }
192 2 : }
193 : } // namespace
194 :
195 : grpc::Status CommandServiceTransportGrpc::StatusStream(
196 : grpc::ServerContext *context,
197 : const iroha::protocol::TxStatusRequest *request,
198 : grpc::ServerWriter<iroha::protocol::ToriiResponse> *response_writer) {
199 2 : rxcpp::schedulers::run_loop rl;
200 :
201 2 : auto current_thread = rxcpp::synchronize_in_one_worker(
202 2 : rxcpp::schedulers::make_run_loop(rl));
203 :
204 2 : rxcpp::composite_subscription subscription;
205 :
206 2 : auto hash = shared_model::crypto::Hash::fromHexString(request->tx_hash());
207 :
208 2 : auto client_id_format = boost::format("Peer: '%s', %s");
209 : std::string client_id =
210 2 : (client_id_format % context->peer() % hash.toString()).str();
211 2 : auto status_bus = command_service_->getStatusStream(hash);
212 : auto consensus_gate_observable =
213 2 : consensus_gate_objects_
214 : // a dummy start_with lets us don't wait for the consensus event
215 : // on further combine_latest
216 2 : .start_with(ConsensusGateEvent{});
217 :
218 2 : boost::optional<iroha::protocol::TxStatus> last_tx_status;
219 2 : auto rounds_counter{0};
220 2 : makeCombineLatestUntilFirstCompleted(
221 : status_bus,
222 : current_thread,
223 : [](auto status, auto) { return status; },
224 : consensus_gate_observable)
225 : // complete the observable if client is disconnected or too many
226 : // rounds have passed without tx status change
227 : .take_while([=, &rounds_counter, &last_tx_status](
228 : const auto &response) {
229 2 : const auto &proto_response =
230 2 : std::static_pointer_cast<
231 2 : shared_model::proto::TransactionResponse>(response)
232 2 : ->getTransport();
233 :
234 2 : if (context->IsCancelled()) {
235 2 : log_->debug("client unsubscribed, {}", client_id);
236 0 : return false;
237 : }
238 :
239 : // increment round counter when the same status arrived again.
240 2 : auto status = proto_response.tx_status();
241 2 : auto status_is_same =
242 2 : last_tx_status and (status == *last_tx_status);
243 2 : if (status_is_same) {
244 1 : ++rounds_counter;
245 1 : if (rounds_counter >= maximum_rounds_without_update_) {
246 : // we stop the stream when round counter is greater than
247 : // allowed.
248 0 : return false;
249 : }
250 : // omit the received status, but do not stop the stream
251 1 : return true;
252 : }
253 1 : rounds_counter = 0;
254 1 : last_tx_status = status;
255 :
256 : // write a new status to the stream
257 2 : if (not response_writer->Write(proto_response)) {
258 0 : log_->error("write to stream has failed to client {}", client_id);
259 0 : return false;
260 : }
261 1 : log_->debug("status written, {}", client_id);
262 1 : return true;
263 2 : })
264 2 : .subscribe(subscription,
265 : [](const auto &) {},
266 : [&](std::exception_ptr ep) {
267 0 : log_->error("something bad happened, client_id {}",
268 0 : client_id);
269 0 : },
270 : [&] { log_->debug("stream done, {}", client_id); });
271 :
272 : // run loop while subscription is active or there are pending events in
273 : // the queue
274 2 : handleEvents(subscription, rl);
275 :
276 2 : log_->debug("status stream done, {}", client_id);
277 2 : return grpc::Status::OK;
278 2 : }
279 : } // namespace torii
280 : } // namespace iroha
|