Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "pending_txs_storage/impl/pending_txs_storage_impl.hpp"
7 :
8 : #include "interfaces/transaction.hpp"
9 : #include "multi_sig_transactions/state/mst_state.hpp"
10 :
11 : namespace iroha {
12 :
13 : PendingTransactionStorageImpl::PendingTransactionStorageImpl(
14 : StateObservable updated_batches,
15 : BatchObservable prepared_batch,
16 253 : BatchObservable expired_batch) {
17 253 : updated_batches_subscription_ =
18 : updated_batches.subscribe([this](const SharedState &batches) {
19 17 : this->updatedBatchesHandler(batches);
20 17 : });
21 253 : prepared_batch_subscription_ =
22 : prepared_batch.subscribe([this](const SharedBatch &preparedBatch) {
23 4 : this->removeBatch(preparedBatch);
24 4 : });
25 253 : expired_batch_subscription_ =
26 : expired_batch.subscribe([this](const SharedBatch &expiredBatch) {
27 1 : this->removeBatch(expiredBatch);
28 1 : });
29 253 : }
30 :
31 : PendingTransactionStorageImpl::~PendingTransactionStorageImpl() {
32 253 : updated_batches_subscription_.unsubscribe();
33 253 : prepared_batch_subscription_.unsubscribe();
34 253 : expired_batch_subscription_.unsubscribe();
35 253 : }
36 :
37 : PendingTransactionStorageImpl::SharedTxsCollectionType
38 : PendingTransactionStorageImpl::getPendingTransactions(
39 : const AccountIdType &account_id) const {
40 15 : std::shared_lock<std::shared_timed_mutex> lock(mutex_);
41 15 : auto creator_it = storage_.index.find(account_id);
42 15 : if (storage_.index.end() != creator_it) {
43 15 : auto &batch_hashes = creator_it->second;
44 15 : SharedTxsCollectionType result;
45 15 : auto &batches = storage_.batches;
46 29 : for (const auto &batch_hash : batch_hashes) {
47 14 : auto batch_it = batches.find(batch_hash);
48 14 : if (batches.end() != batch_it) {
49 14 : auto &txs = batch_it->second->transactions();
50 14 : result.insert(result.end(), txs.begin(), txs.end());
51 14 : }
52 : }
53 15 : return result;
54 15 : }
55 0 : return {};
56 15 : }
57 :
58 : std::set<PendingTransactionStorageImpl::AccountIdType>
59 : PendingTransactionStorageImpl::batchCreators(const TransactionBatch &batch) {
60 20 : std::set<AccountIdType> creators;
61 45 : for (const auto &transaction : batch.transactions()) {
62 25 : creators.insert(transaction->creatorAccountId());
63 : }
64 20 : return creators;
65 20 : }
66 :
67 : void PendingTransactionStorageImpl::updatedBatchesHandler(
68 : const SharedState &updated_batches) {
69 17 : std::unique_lock<std::shared_timed_mutex> lock(mutex_);
70 36 : for (auto &batch : updated_batches->getBatches()) {
71 19 : auto hash = batch->reducedHash();
72 19 : auto it = storage_.batches.find(hash);
73 19 : if (storage_.batches.end() == it) {
74 33 : for (const auto &creator : batchCreators(*batch)) {
75 18 : storage_.index[creator].insert(hash);
76 : }
77 15 : }
78 19 : storage_.batches[hash] = batch;
79 19 : }
80 17 : }
81 :
82 : void PendingTransactionStorageImpl::removeBatch(const SharedBatch &batch) {
83 5 : auto creators = batchCreators(*batch);
84 5 : auto hash = batch->reducedHash();
85 5 : std::unique_lock<std::shared_timed_mutex> lock(mutex_);
86 5 : auto &batches = storage_.batches;
87 5 : auto batch_it = batches.find(hash);
88 5 : if (batches.end() != batch_it) {
89 5 : batches.erase(batch_it);
90 5 : }
91 10 : for (const auto &creator : creators) {
92 5 : auto &index = storage_.index;
93 5 : auto index_it = index.find(creator);
94 5 : if (index.end() != index_it) {
95 5 : auto &creator_set = index_it->second;
96 5 : auto creator_it = creator_set.find(hash);
97 5 : if (creator_set.end() != creator_it) {
98 5 : creator_set.erase(creator_it);
99 5 : }
100 5 : }
101 : }
102 5 : }
103 :
104 : } // namespace iroha
|