Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/on_demand_ordering_service_impl.hpp"
7 :
8 : #include <unordered_set>
9 :
10 : #include <boost/optional.hpp>
11 : #include <boost/range/adaptor/filtered.hpp>
12 : #include <boost/range/adaptor/indirected.hpp>
13 : #include <boost/range/adaptor/transformed.hpp>
14 : #include <boost/range/algorithm/for_each.hpp>
15 : #include <boost/range/size.hpp>
16 : #include "ametsuchi/tx_presence_cache.hpp"
17 : #include "ametsuchi/tx_presence_cache_utils.hpp"
18 : #include "common/visitor.hpp"
19 : #include "datetime/time.hpp"
20 : #include "interfaces/iroha_internal/proposal.hpp"
21 : #include "interfaces/iroha_internal/transaction_batch.hpp"
22 : #include "interfaces/transaction.hpp"
23 : #include "logger/logger.hpp"
24 :
25 : using namespace iroha;
26 : using namespace iroha::ordering;
27 : using TransactionBatchType = transport::OdOsNotification::TransactionBatchType;
28 :
29 : OnDemandOrderingServiceImpl::OnDemandOrderingServiceImpl(
30 : size_t transaction_limit,
31 : std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
32 : proposal_factory,
33 : std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
34 : logger::LoggerPtr log,
35 : size_t number_of_proposals,
36 : const consensus::Round &initial_round)
37 257 : : transaction_limit_(transaction_limit),
38 257 : number_of_proposals_(number_of_proposals),
39 257 : proposal_factory_(std::move(proposal_factory)),
40 257 : tx_cache_(std::move(tx_cache)),
41 257 : log_(std::move(log)) {
42 257 : onCollaborationOutcome(initial_round);
43 257 : }
44 :
45 : // -------------------------| OnDemandOrderingService |-------------------------
46 :
47 : void OnDemandOrderingServiceImpl::onCollaborationOutcome(
48 : consensus::Round round) {
49 3657 : log_->info("onCollaborationOutcome => {}", round);
50 : // exclusive write lock
51 3657 : std::lock_guard<std::shared_timed_mutex> guard(lock_);
52 3657 : log_->debug("onCollaborationOutcome => write lock is acquired");
53 :
54 3657 : packNextProposals(round);
55 3657 : tryErase();
56 3657 : }
57 :
58 : // ----------------------------| OdOsNotification |-----------------------------
59 :
60 : void OnDemandOrderingServiceImpl::onBatches(consensus::Round round,
61 : CollectionType batches) {
62 : // read lock
63 2968 : std::shared_lock<std::shared_timed_mutex> guard(lock_);
64 2968 : log_->info("onBatches => collection size = {}, {}", batches.size(), round);
65 :
66 : auto unprocessed_batches =
67 : boost::adaptors::filter(batches, [this](const auto &batch) {
68 3409 : log_->info("check batch {} for already processed transactions",
69 3386 : batch->reducedHash().hex());
70 3409 : return not this->batchAlreadyProcessed(*batch);
71 0 : });
72 3180 : auto it = current_proposals_.find(round);
73 3238 : if (it == current_proposals_.end()) {
74 952 : it =
75 952 : std::find_if(current_proposals_.begin(),
76 863 : current_proposals_.end(),
77 : [&round](const auto &p) {
78 1897 : auto request_reject_round = round.reject_round;
79 1897 : auto reject_round = p.first.reject_round;
80 1904 : return request_reject_round == reject_round
81 1897 : or (request_reject_round >= 2 and reject_round >= 2);
82 : });
83 970 : if (it == current_proposals_.end()) {
84 0 : log_->critical("No place to store the batches!");
85 0 : assert(false); // terminate if in debug build
86 : return;
87 : }
88 933 : log_->debug("onBatches => collection will be inserted to {}", it->first);
89 933 : }
90 3117 : std::for_each(unprocessed_batches.begin(),
91 3108 : unprocessed_batches.end(),
92 : [&it](auto &obj) { it->second.push(std::move(obj)); });
93 3262 : log_->debug("onBatches => collection is inserted");
94 3262 : }
95 :
96 : boost::optional<
97 : std::shared_ptr<const OnDemandOrderingServiceImpl::ProposalType>>
98 : OnDemandOrderingServiceImpl::onRequestProposal(consensus::Round round) {
99 : // read lock
100 3110 : std::shared_lock<std::shared_timed_mutex> guard(lock_);
101 3110 : auto proposal = proposal_map_.find(round);
102 : // space between '{}' and 'returning' is not missing, since either nothing, or
103 : // NOT with space is printed
104 3110 : log_->debug("onRequestProposal, {}, {}returning a proposal.",
105 : round,
106 3110 : (proposal == proposal_map_.end()) ? "NOT " : "");
107 3110 : if (proposal != proposal_map_.end()) {
108 1333 : return proposal->second;
109 : } else {
110 1777 : return boost::none;
111 : }
112 3110 : }
113 :
114 : // ---------------------------------| Private |---------------------------------
115 :
116 : /**
117 : * Get transactions from the given batches queue. Does not break batches -
118 : * continues getting all the transactions from the ongoing batch until the
119 : * required amount is collected.
120 : * @param requested_tx_amount - amount of transactions to get
121 : * @param tx_batches_queue - the queue to get transactions from
122 : * @param discarded_txs_amount - the amount of discarded txs
123 :
124 : * @return transactions
125 : */
126 : static std::vector<std::shared_ptr<shared_model::interface::Transaction>>
127 : getTransactions(size_t requested_tx_amount,
128 : tbb::concurrent_queue<TransactionBatchType> &tx_batches_queue,
129 : boost::optional<size_t &> discarded_txs_amount) {
130 2179 : TransactionBatchType batch;
131 2179 : std::vector<std::shared_ptr<shared_model::interface::Transaction>> collection;
132 2179 : std::unordered_set<std::string> inserted;
133 :
134 4378 : while (collection.size() < requested_tx_amount
135 4378 : and tx_batches_queue.try_pop(batch)
136 2223 : and inserted.insert(batch->reducedHash().hex()).second) {
137 2199 : collection.insert(
138 2199 : std::end(collection),
139 2199 : std::make_move_iterator(std::begin(batch->transactions())),
140 2199 : std::make_move_iterator(std::end(batch->transactions())));
141 : }
142 :
143 2179 : if (discarded_txs_amount) {
144 2179 : *discarded_txs_amount = 0;
145 2974 : while (tx_batches_queue.try_pop(batch)) {
146 795 : *discarded_txs_amount += boost::size(batch->transactions());
147 : }
148 2179 : }
149 :
150 2179 : return collection;
151 2179 : }
152 :
153 : void OnDemandOrderingServiceImpl::packNextProposals(
154 : const consensus::Round &round) {
155 : auto close_round = [this](consensus::Round round) {
156 4895 : log_->debug("close {}", round);
157 :
158 4895 : auto it = current_proposals_.find(round);
159 4895 : if (it != current_proposals_.end()) {
160 3889 : log_->debug("proposal found");
161 3889 : if (not it->second.empty()) {
162 2179 : log_->debug("Mutable proposal generation for round {}", round);
163 : size_t discarded_txs_amount;
164 2179 : auto txs = getTransactions(transaction_limit_, it->second, discarded_txs_amount);
165 2179 : if (not txs.empty()) {
166 2179 : log_->debug("Number of transactions in proposal = {}", txs.size());
167 2179 : auto proposal = proposal_factory_->unsafeCreateProposal(
168 2179 : round.block_round,
169 2179 : iroha::time::now(),
170 2179 : std::move(txs) | boost::adaptors::indirected);
171 2179 : proposal_map_.emplace(round, std::move(proposal));
172 2179 : log_->debug(
173 2179 : "packNextProposal: data has been fetched for {}. "
174 : "Discarded {} transactions.",
175 : round,
176 : discarded_txs_amount);
177 2179 : round_queue_.push(round);
178 2179 : }
179 2179 : }
180 3889 : current_proposals_.erase(it);
181 3889 : }
182 4895 : };
183 :
184 : auto open_round = [this](consensus::Round round) {
185 6133 : log_->debug("open {}", round);
186 6133 : current_proposals_[round];
187 6133 : };
188 :
189 : /*
190 : * The possible cases can be visualised as a diagram, where:
191 : * o - current round, x - next round, v - target round
192 : *
193 : * 0 1 2
194 : * 0 o x v
195 : * 1 x v .
196 : * 2 v . .
197 : *
198 : * Reject case:
199 : *
200 : * 0 1 2 3
201 : * 0 . o x v
202 : * 1 x v . .
203 : * 2 v . . .
204 : *
205 : * (0,1) - current round. Round (0,2) is closed for transactions.
206 : * Round (0,3) is now receiving transactions.
207 : * Rounds (1,) and (2,) do not change.
208 : *
209 : * Commit case:
210 : *
211 : * 0 1 2
212 : * 0 . . .
213 : * 1 o x v
214 : * 2 x v .
215 : * 3 v . .
216 : *
217 : * (1,0) - current round. The diagram is similar to the initial case.
218 : */
219 :
220 : // close next reject round
221 3657 : close_round({round.block_round, round.reject_round + 1});
222 :
223 3657 : if (round.reject_round == kFirstRejectRound) {
224 : // new block round
225 1238 : close_round({round.block_round + 1, round.reject_round});
226 :
227 : // remove current queues
228 1238 : current_proposals_.clear();
229 : // initialize the 3 diagonal rounds from the commit case diagram
230 1238 : open_round({round.block_round + 1, kNextRejectRoundConsumer});
231 1238 : open_round({round.block_round + 2, kNextCommitRoundConsumer});
232 1238 : }
233 :
234 : // new reject round
235 3657 : open_round(
236 3657 : {round.block_round, currentRejectRoundConsumer(round.reject_round)});
237 3657 : }
238 :
239 : void OnDemandOrderingServiceImpl::tryErase() {
240 5122 : while (round_queue_.size() > number_of_proposals_) {
241 1465 : auto &round = round_queue_.front();
242 1465 : proposal_map_.erase(round);
243 1465 : log_->info("tryErase: erased {}", round);
244 1465 : round_queue_.pop();
245 : }
246 3657 : }
247 :
248 : bool OnDemandOrderingServiceImpl::batchAlreadyProcessed(
249 : const shared_model::interface::TransactionBatch &batch) {
250 3290 : auto tx_statuses = tx_cache_->check(batch);
251 3290 : if (not tx_statuses) {
252 : // TODO andrei 30.11.18 IR-51 Handle database error
253 3 : log_->warn("Check tx presence database error. Batch: {}", batch);
254 3 : return true;
255 : }
256 : // if any transaction is commited or rejected, batch was already processed
257 : // Note: any_of returns false for empty sequence
258 3338 : return std::any_of(
259 : tx_statuses->begin(), tx_statuses->end(), [this](const auto &tx_status) {
260 3342 : if (iroha::ametsuchi::isAlreadyProcessed(tx_status)) {
261 2 : log_->warn("Duplicate transaction: {}",
262 2 : iroha::ametsuchi::getHash(tx_status).hex());
263 2 : return true;
264 : }
265 3270 : return false;
266 3287 : });
267 3321 : }
|