Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include <utility>
7 :
8 : #include "logger/logger.hpp"
9 : #include "multi_sig_transactions/mst_processor_impl.hpp"
10 :
11 : namespace iroha {
12 :
13 : FairMstProcessor::FairMstProcessor(
14 : std::shared_ptr<iroha::network::MstTransport> transport,
15 : std::shared_ptr<MstStorage> storage,
16 : std::shared_ptr<PropagationStrategy> strategy,
17 : std::shared_ptr<MstTimeProvider> time_provider,
18 : logger::LoggerPtr log)
19 255 : : MstProcessor(log), // use the same logger in base class
20 255 : transport_(std::move(transport)),
21 255 : storage_(std::move(storage)),
22 255 : strategy_(std::move(strategy)),
23 255 : time_provider_(std::move(time_provider)),
24 255 : propagation_subscriber_(strategy_->emitter().subscribe(
25 : [this](auto data) { this->onPropagate(data); })),
26 255 : log_(std::move(log)) {}
27 :
28 : FairMstProcessor::~FairMstProcessor() {
29 255 : propagation_subscriber_.unsubscribe();
30 255 : }
31 :
32 : // -------------------------| MstProcessor override |-------------------------
33 :
34 : auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch)
35 : -> decltype(propagateBatch(batch)) {
36 23 : auto state_update = storage_->updateOwnState(batch);
37 23 : completedBatchesNotify(*state_update.completed_state_);
38 23 : updatedBatchesNotify(*state_update.updated_state_);
39 23 : expiredBatchesNotify(
40 23 : storage_->getExpiredTransactions(time_provider_->getCurrentTime()));
41 23 : }
42 :
43 : auto FairMstProcessor::onStateUpdateImpl() const
44 : -> decltype(onStateUpdate()) {
45 500 : return state_subject_.get_observable();
46 0 : }
47 :
48 : auto FairMstProcessor::onPreparedBatchesImpl() const
49 : -> decltype(onPreparedBatches()) {
50 500 : return batches_subject_.get_observable();
51 0 : }
52 :
53 : auto FairMstProcessor::onExpiredBatchesImpl() const
54 : -> decltype(onExpiredBatches()) {
55 500 : return expired_subject_.get_observable();
56 0 : }
57 :
58 : // TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one
59 : void FairMstProcessor::completedBatchesNotify(ConstRefState state) const {
60 24 : if (not state.isEmpty()) {
61 5 : auto completed_batches = state.getBatches();
62 5 : std::for_each(completed_batches.begin(),
63 5 : completed_batches.end(),
64 : [this](const auto &batch) {
65 5 : batches_subject_.get_subscriber().on_next(batch);
66 5 : });
67 5 : }
68 24 : }
69 :
70 : void FairMstProcessor::updatedBatchesNotify(ConstRefState state) const {
71 24 : if (not state.isEmpty()) {
72 18 : state_subject_.get_subscriber().on_next(
73 18 : std::make_shared<MstState>(state));
74 18 : }
75 24 : }
76 :
77 : void FairMstProcessor::expiredBatchesNotify(ConstRefState state) const {
78 24 : if (not state.isEmpty()) {
79 1 : auto expired_batches = state.getBatches();
80 1 : std::for_each(expired_batches.begin(),
81 1 : expired_batches.end(),
82 : [this](const auto &batch) {
83 1 : expired_subject_.get_subscriber().on_next(batch);
84 1 : });
85 1 : }
86 24 : }
87 :
88 : bool FairMstProcessor::batchInStorageImpl(const DataType &batch) const {
89 722 : return storage_->batchInStorage(batch);
90 : }
91 :
92 : // -------------------| MstTransportNotification override |-------------------
93 :
94 : void FairMstProcessor::onNewState(const shared_model::crypto::PublicKey &from,
95 : ConstRefState new_state) {
96 1 : log_->info("Applying new state");
97 1 : auto current_time = time_provider_->getCurrentTime();
98 :
99 1 : auto state_update = storage_->apply(from, new_state);
100 :
101 : // updated batches
102 1 : updatedBatchesNotify(*state_update.updated_state_);
103 1 : log_->info("New batches size: {}",
104 1 : state_update.updated_state_->getBatches().size());
105 :
106 : // completed batches
107 1 : completedBatchesNotify(*state_update.completed_state_);
108 :
109 : // expired batches
110 1 : expiredBatchesNotify(storage_->getDiffState(from, current_time));
111 1 : }
112 :
113 : // -----------------------------| private api |-----------------------------
114 :
115 : void FairMstProcessor::onPropagate(
116 : const PropagationStrategy::PropagationData &data) {
117 10 : auto current_time = time_provider_->getCurrentTime();
118 10 : auto size = data.size();
119 10 : std::for_each(data.begin(),
120 10 : data.end(),
121 : [this, ¤t_time, size](const auto &dst_peer) {
122 19 : auto diff = storage_->getDiffState(dst_peer->pubkey(),
123 19 : current_time);
124 19 : if (not diff.isEmpty()) {
125 4 : log_->info("Propagate new data[{}]", size);
126 4 : transport_->sendState(*dst_peer, diff);
127 4 : }
128 19 : });
129 10 : }
130 :
131 : } // namespace iroha
|