Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/on_demand_ordering_gate.hpp"
7 :
8 : #include <boost/range/adaptor/filtered.hpp>
9 : #include <boost/range/adaptor/indexed.hpp>
10 : #include <boost/range/adaptor/transformed.hpp>
11 : #include <boost/range/empty.hpp>
12 : #include "ametsuchi/tx_presence_cache.hpp"
13 : #include "common/visitor.hpp"
14 : #include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp"
15 : #include "logger/logger.hpp"
16 : #include "ordering/impl/on_demand_common.hpp"
17 :
18 : using namespace iroha;
19 : using namespace iroha::ordering;
20 :
21 : OnDemandOrderingGate::OnDemandOrderingGate(
22 : std::shared_ptr<OnDemandOrderingService> ordering_service,
23 : std::shared_ptr<transport::OdOsNotification> network_client,
24 : rxcpp::observable<BlockRoundEventType> events,
25 : std::shared_ptr<cache::OrderingGateCache> cache,
26 : std::shared_ptr<shared_model::interface::UnsafeProposalFactory> factory,
27 : std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
28 : consensus::Round initial_round,
29 : logger::LoggerPtr log)
30 256 : : log_(std::move(log)),
31 256 : ordering_service_(std::move(ordering_service)),
32 256 : network_client_(std::move(network_client)),
33 : events_subscription_(events.subscribe([this](auto event) {
34 : // exclusive lock
35 3381 : std::unique_lock<std::shared_timed_mutex> lock(mutex_);
36 3381 : visit_in_place(event,
37 : [this](const BlockEvent &block_event) {
38 : // block committed, increment block round
39 970 : log_->debug("BlockEvent. {}", block_event.round);
40 970 : current_round_ = block_event.round;
41 970 : },
42 : [this](const EmptyEvent &empty_event) {
43 : // no blocks committed, increment reject round
44 2411 : log_->debug("EmptyEvent");
45 2411 : current_round_ = empty_event.round;
46 2411 : });
47 3381 : log_->debug("Current: {}", current_round_);
48 3381 : lock.unlock();
49 :
50 3381 : visit_in_place(event,
51 : [this](const BlockEvent &block_event) {
52 : // block committed, remove transactions from cache
53 970 : cache_->remove(block_event.hashes);
54 970 : },
55 : [this](const EmptyEvent &) {
56 : // no blocks committed, no transactions to remove
57 2411 : });
58 :
59 3381 : auto batches = cache_->pop();
60 :
61 3381 : cache_->addToBack(batches);
62 3381 : if (not batches.empty()) {
63 393 : network_client_->onBatches(
64 393 : current_round_,
65 393 : transport::OdOsNotification::CollectionType{batches.begin(),
66 393 : batches.end()});
67 393 : }
68 :
69 : // notify our ordering service about new round
70 3381 : ordering_service_->onCollaborationOutcome(current_round_);
71 :
72 : // request proposal for the current round
73 3381 : auto proposal = this->processProposalRequest(
74 3381 : network_client_->onRequestProposal(current_round_));
75 : // vote for the object received from the network
76 3381 : proposal_notifier_.get_subscriber().on_next(
77 3381 : network::OrderingEvent{std::move(proposal), current_round_});
78 3381 : })),
79 256 : cache_(std::move(cache)),
80 256 : proposal_factory_(std::move(factory)),
81 256 : tx_cache_(std::move(tx_cache)),
82 256 : current_round_(initial_round) {}
83 :
84 : OnDemandOrderingGate::~OnDemandOrderingGate() {
85 256 : events_subscription_.unsubscribe();
86 256 : }
87 :
88 : void OnDemandOrderingGate::propagateBatch(
89 : std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
90 725 : cache_->addToBack({batch});
91 :
92 725 : std::shared_lock<std::shared_timed_mutex> lock(mutex_);
93 725 : network_client_->onBatches(
94 725 : current_round_, transport::OdOsNotification::CollectionType{batch});
95 725 : }
96 :
97 : rxcpp::observable<network::OrderingEvent> OnDemandOrderingGate::onProposal() {
98 746 : return proposal_notifier_.get_observable();
99 0 : }
100 :
101 : void OnDemandOrderingGate::setPcs(
102 : const iroha::network::PeerCommunicationService &pcs) {
103 0 : throw std::logic_error(
104 : "Method is deprecated. PCS observable should be set in ctor");
105 0 : }
106 :
107 : boost::optional<std::shared_ptr<const shared_model::interface::Proposal>>
108 : OnDemandOrderingGate::processProposalRequest(
109 : boost::optional<
110 : std::shared_ptr<const OnDemandOrderingService::ProposalType>> proposal)
111 : const {
112 3381 : if (not proposal) {
113 2061 : return boost::none;
114 : }
115 1320 : auto proposal_without_replays = removeReplays(*std::move(proposal));
116 : // no need to check empty proposal
117 1320 : if (boost::empty(proposal_without_replays->transactions())) {
118 597 : return boost::none;
119 : }
120 723 : return proposal_without_replays;
121 3381 : }
122 :
123 : std::shared_ptr<const shared_model::interface::Proposal>
124 : OnDemandOrderingGate::removeReplays(
125 : std::shared_ptr<const shared_model::interface::Proposal> proposal) const {
126 1320 : std::vector<bool> proposal_txs_validation_results;
127 : auto tx_is_not_processed = [this](const auto &tx) {
128 1325 : auto tx_result = tx_cache_->check(tx.hash());
129 1325 : if (not tx_result) {
130 : // TODO andrei 30.11.18 IR-51 Handle database error
131 82 : return false;
132 : }
133 1243 : return iroha::visit_in_place(
134 1243 : *tx_result,
135 : [](const ametsuchi::tx_cache_status_responses::Missing &) {
136 728 : return true;
137 : },
138 : [](const auto &status) {
139 : // TODO nickaleks 21.11.18: IR-1887 log replayed transactions
140 : // when log is added
141 514 : return false;
142 : });
143 1325 : };
144 :
145 1320 : shared_model::interface::TransactionBatchParserImpl batch_parser;
146 :
147 1320 : bool has_replays = false;
148 1320 : auto batches = batch_parser.parseBatches(proposal->transactions());
149 2640 : for (auto &batch : batches) {
150 1320 : bool all_txs_are_new =
151 1320 : std::all_of(batch.begin(), batch.end(), tx_is_not_processed);
152 1320 : proposal_txs_validation_results.insert(
153 1320 : proposal_txs_validation_results.end(), batch.size(), all_txs_are_new);
154 1320 : has_replays |= not all_txs_are_new;
155 : }
156 :
157 1320 : if (not has_replays) {
158 723 : return std::move(proposal);
159 : }
160 :
161 : auto unprocessed_txs =
162 597 : proposal->transactions() | boost::adaptors::indexed()
163 597 : | boost::adaptors::filtered(
164 : [proposal_txs_validation_results =
165 597 : std::move(proposal_txs_validation_results)](const auto &el) {
166 597 : return proposal_txs_validation_results.at(el.index());
167 : })
168 597 : | boost::adaptors::transformed(
169 : [](const auto &el) -> decltype(auto) { return el.value(); });
170 :
171 597 : return proposal_factory_->unsafeCreateProposal(
172 597 : proposal->height(), proposal->createdTime(), unprocessed_txs);
173 1320 : }
|