Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "simulator/impl/simulator.hpp"
7 :
8 : #include <boost/range/adaptor/transformed.hpp>
9 : #include "common/bind.hpp"
10 : #include "interfaces/iroha_internal/block.hpp"
11 : #include "interfaces/iroha_internal/proposal.hpp"
12 : #include "logger/logger.hpp"
13 :
14 : namespace iroha {
15 : namespace simulator {
16 :
17 : Simulator::Simulator(
18 : std::shared_ptr<network::OrderingGate> ordering_gate,
19 : std::shared_ptr<validation::StatefulValidator> statefulValidator,
20 : std::shared_ptr<ametsuchi::TemporaryFactory> factory,
21 : std::shared_ptr<ametsuchi::BlockQueryFactory> block_query_factory,
22 : std::shared_ptr<CryptoSignerType> crypto_signer,
23 : std::unique_ptr<shared_model::interface::UnsafeBlockFactory>
24 : block_factory,
25 : logger::LoggerPtr log)
26 251 : : validator_(std::move(statefulValidator)),
27 251 : ametsuchi_factory_(std::move(factory)),
28 251 : block_query_factory_(block_query_factory),
29 251 : crypto_signer_(std::move(crypto_signer)),
30 251 : block_factory_(std::move(block_factory)),
31 251 : log_(std::move(log)) {
32 251 : ordering_gate->onProposal().subscribe(
33 : proposal_subscription_, [this](const network::OrderingEvent &event) {
34 3376 : if (event.proposal) {
35 : auto validated_proposal_and_errors =
36 724 : this->processProposal(*getProposalUnsafe(event));
37 :
38 724 : if (validated_proposal_and_errors) {
39 722 : notifier_.get_subscriber().on_next(VerifiedProposalCreatorEvent{
40 722 : *validated_proposal_and_errors, event.round});
41 722 : }
42 724 : } else {
43 2652 : notifier_.get_subscriber().on_next(
44 2652 : VerifiedProposalCreatorEvent{boost::none, event.round});
45 : }
46 3376 : });
47 :
48 251 : notifier_.get_observable().subscribe(
49 251 : verified_proposal_subscription_,
50 : [this](const VerifiedProposalCreatorEvent &event) {
51 3374 : if (event.verified_proposal_result) {
52 722 : auto proposal_and_errors = getVerifiedProposalUnsafe(event);
53 722 : auto block = this->processVerifiedProposal(proposal_and_errors);
54 722 : if (block) {
55 722 : block_notifier_.get_subscriber().on_next(BlockCreatorEvent{
56 722 : RoundData{proposal_and_errors->verified_proposal, *block},
57 722 : event.round});
58 722 : }
59 722 : } else {
60 2652 : block_notifier_.get_subscriber().on_next(
61 2652 : BlockCreatorEvent{boost::none, event.round});
62 : }
63 3374 : });
64 251 : }
65 :
66 : Simulator::~Simulator() {
67 251 : proposal_subscription_.unsubscribe();
68 251 : verified_proposal_subscription_.unsubscribe();
69 251 : }
70 :
71 : rxcpp::observable<VerifiedProposalCreatorEvent>
72 : Simulator::onVerifiedProposal() {
73 497 : return notifier_.get_observable();
74 0 : }
75 :
76 : boost::optional<std::shared_ptr<validation::VerifiedProposalAndErrors>>
77 : Simulator::processProposal(
78 : const shared_model::interface::Proposal &proposal) {
79 725 : log_->info("process proposal");
80 :
81 : // Get last block from local ledger
82 725 : if (auto block_query_opt = block_query_factory_->createBlockQuery()) {
83 725 : auto block_var = block_query_opt.value()->getTopBlock();
84 725 : if (auto e = boost::get<expected::Error<std::string>>(&block_var)) {
85 1 : log_->warn("Could not fetch last block: " + e->error);
86 1 : return boost::none;
87 : }
88 :
89 724 : last_block = boost::get<expected::Value<
90 724 : std::shared_ptr<shared_model::interface::Block>>>(&block_var)
91 724 : ->value;
92 1449 : } else {
93 0 : log_->error("could not create block query");
94 0 : return boost::none;
95 : }
96 :
97 724 : if (last_block->height() + 1 != proposal.height()) {
98 1 : log_->warn("Last block height: {}, proposal height: {}",
99 1 : last_block->height(),
100 1 : proposal.height());
101 1 : return boost::none;
102 : }
103 :
104 723 : auto temporary_wsv_var = ametsuchi_factory_->createTemporaryWsv();
105 723 : if (auto e =
106 723 : boost::get<expected::Error<std::string>>(&temporary_wsv_var)) {
107 0 : log_->error("could not create temporary storage: {}", e->error);
108 0 : return boost::none;
109 : }
110 :
111 723 : auto storage = std::move(
112 723 : boost::get<expected::Value<std::unique_ptr<ametsuchi::TemporaryWsv>>>(
113 723 : &temporary_wsv_var)
114 723 : ->value);
115 :
116 : std::shared_ptr<iroha::validation::VerifiedProposalAndErrors>
117 723 : validated_proposal_and_errors =
118 723 : validator_->validate(proposal, *storage);
119 723 : ametsuchi_factory_->prepareBlock(std::move(storage));
120 :
121 723 : return validated_proposal_and_errors;
122 725 : }
123 :
124 : boost::optional<std::shared_ptr<shared_model::interface::Block>>
125 : Simulator::processVerifiedProposal(
126 : const std::shared_ptr<iroha::validation::VerifiedProposalAndErrors>
127 : &verified_proposal_and_errors) {
128 722 : log_->info("process verified proposal");
129 :
130 722 : auto height = block_query_factory_->createBlockQuery() |
131 : [&](const auto &block_query) {
132 722 : return block_query->getTopBlockHeight() + 1;
133 : };
134 722 : if (not height) {
135 0 : log_->error("Unable to query top block height");
136 0 : return boost::none;
137 : }
138 722 : const auto &proposal = verified_proposal_and_errors->verified_proposal;
139 722 : std::vector<shared_model::crypto::Hash> rejected_hashes;
140 781 : for (const auto &rejected_tx :
141 722 : verified_proposal_and_errors->rejected_transactions) {
142 59 : rejected_hashes.push_back(rejected_tx.tx_hash);
143 : }
144 722 : std::shared_ptr<shared_model::interface::Block> block =
145 722 : block_factory_->unsafeCreateBlock(height,
146 722 : last_block->hash(),
147 722 : proposal->createdTime(),
148 722 : proposal->transactions(),
149 722 : rejected_hashes);
150 722 : crypto_signer_->sign(*block);
151 :
152 722 : return block;
153 722 : }
154 :
155 : rxcpp::observable<BlockCreatorEvent> Simulator::onBlock() {
156 250 : return block_notifier_.get_observable();
157 0 : }
158 :
159 : } // namespace simulator
160 : } // namespace iroha
|