Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ametsuchi/impl/mutable_storage_impl.hpp"
7 :
8 : #include <boost/variant/apply_visitor.hpp>
9 : #include "ametsuchi/impl/peer_query_wsv.hpp"
10 : #include "ametsuchi/impl/postgres_block_index.hpp"
11 : #include "ametsuchi/impl/postgres_command_executor.hpp"
12 : #include "ametsuchi/impl/postgres_wsv_command.hpp"
13 : #include "ametsuchi/impl/postgres_wsv_query.hpp"
14 : #include "interfaces/commands/command.hpp"
15 : #include "interfaces/common_objects/common_objects_factory.hpp"
16 : #include "interfaces/iroha_internal/block.hpp"
17 : #include "logger/logger.hpp"
18 : #include "logger/logger_manager.hpp"
19 :
20 : namespace iroha {
21 : namespace ametsuchi {
22 : MutableStorageImpl::MutableStorageImpl(
23 : shared_model::interface::types::HashType top_hash,
24 : std::shared_ptr<PostgresCommandExecutor> cmd_executor,
25 : std::unique_ptr<soci::session> sql,
26 : std::shared_ptr<shared_model::interface::CommonObjectsFactory> factory,
27 : logger::LoggerManagerTreePtr log_manager)
28 550 : : top_hash_(top_hash),
29 550 : sql_(std::move(sql)),
30 550 : peer_query_(
31 550 : std::make_unique<PeerQueryWsv>(std::make_shared<PostgresWsvQuery>(
32 550 : *sql_,
33 550 : std::move(factory),
34 550 : log_manager->getChild("WsvQuery")->getLogger()))),
35 550 : block_index_(std::make_unique<PostgresBlockIndex>(
36 550 : *sql_, log_manager->getChild("PostgresBlockIndex")->getLogger())),
37 550 : command_executor_(std::move(cmd_executor)),
38 550 : committed(false),
39 550 : log_(log_manager->getLogger()) {
40 550 : *sql_ << "BEGIN";
41 550 : }
42 :
43 : bool MutableStorageImpl::apply(const shared_model::interface::Block &block,
44 : MutableStoragePredicate predicate) {
45 : auto execute_transaction = [this](auto &transaction) {
46 595 : command_executor_->setCreatorAccountId(transaction.creatorAccountId());
47 595 : command_executor_->doValidation(false);
48 :
49 : auto execute_command = [this](const auto &command) {
50 : auto command_applied =
51 4103 : boost::apply_visitor(*command_executor_, command.get());
52 :
53 4103 : return command_applied.match(
54 : [](expected::Value<void> &) { return true; },
55 : [&](expected::Error<CommandError> &e) {
56 0 : log_->error(e.error.toString());
57 0 : return false;
58 0 : });
59 4103 : };
60 :
61 595 : return std::all_of(transaction.commands().begin(),
62 595 : transaction.commands().end(),
63 595 : execute_command);
64 0 : };
65 :
66 555 : log_->info("Applying block: height {}, hash {}",
67 555 : block.height(),
68 555 : block.hash().hex());
69 :
70 555 : auto block_applied = predicate(block, *peer_query_, top_hash_)
71 555 : and std::all_of(block.transactions().begin(),
72 553 : block.transactions().end(),
73 553 : execute_transaction);
74 555 : if (block_applied) {
75 553 : block_store_.insert(std::make_pair(block.height(), clone(block)));
76 553 : block_index_->index(block);
77 :
78 553 : top_hash_ = block.hash();
79 553 : }
80 :
81 555 : return block_applied;
82 0 : }
83 :
84 : template <typename Function>
85 : bool MutableStorageImpl::withSavepoint(Function &&function) {
86 : try {
87 549 : *sql_ << "SAVEPOINT savepoint_";
88 :
89 549 : auto function_executed = std::forward<Function>(function)();
90 :
91 549 : if (function_executed) {
92 549 : *sql_ << "RELEASE SAVEPOINT savepoint_";
93 549 : } else {
94 2 : *sql_ << "ROLLBACK TO SAVEPOINT savepoint_";
95 : }
96 549 : return function_executed;
97 0 : } catch (std::exception &e) {
98 0 : log_->warn("Apply has failed. Reason: {}", e.what());
99 0 : return false;
100 0 : }
101 549 : }
102 :
103 : bool MutableStorageImpl::apply(
104 : const shared_model::interface::Block &block) {
105 : return withSavepoint([&] {
106 549 : return this->apply(
107 : block, [](const auto &, auto &, const auto &) { return true; });
108 0 : });
109 : }
110 :
111 : bool MutableStorageImpl::apply(
112 : rxcpp::observable<std::shared_ptr<shared_model::interface::Block>>
113 : blocks,
114 : MutableStoragePredicate predicate) {
115 : return withSavepoint([&] {
116 4 : return blocks
117 : .all([&](auto block) { return this->apply(*block, predicate); })
118 4 : .as_blocking()
119 4 : .first();
120 0 : });
121 : }
122 :
123 : MutableStorageImpl::~MutableStorageImpl() {
124 550 : if (not committed) {
125 : try {
126 4 : *sql_ << "ROLLBACK";
127 4 : } catch (std::exception &e) {
128 0 : log_->warn("Apply has been failed. Reason: {}", e.what());
129 0 : }
130 4 : }
131 550 : }
132 : } // namespace ametsuchi
133 : } // namespace iroha
|