Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "network/impl/block_loader_impl.hpp"
7 :
8 : #include <grpc++/create_channel.h>
9 :
10 : #include "backend/protobuf/block.hpp"
11 : #include "builders/protobuf/transport_builder.hpp"
12 : #include "common/bind.hpp"
13 : #include "interfaces/common_objects/peer.hpp"
14 : #include "logger/logger.hpp"
15 : #include "network/impl/grpc_channel_builder.hpp"
16 :
17 : using namespace iroha::ametsuchi;
18 : using namespace iroha::network;
19 : using namespace shared_model::crypto;
20 : using namespace shared_model::interface;
21 :
22 : namespace {
23 : const char *kPeerNotFound = "Cannot find peer";
24 : const char *kPeerRetrieveFail = "Failed to retrieve peers";
25 : const char *kPeerFindFail = "Failed to find requested peer";
26 : } // namespace
27 :
28 : BlockLoaderImpl::BlockLoaderImpl(
29 : std::shared_ptr<PeerQueryFactory> peer_query_factory,
30 : shared_model::proto::ProtoBlockFactory factory,
31 : logger::LoggerPtr log)
32 254 : : peer_query_factory_(std::move(peer_query_factory)),
33 254 : block_factory_(std::move(factory)),
34 254 : log_(std::move(log)) {}
35 :
36 : rxcpp::observable<std::shared_ptr<Block>> BlockLoaderImpl::retrieveBlocks(
37 : const shared_model::interface::types::HeightType height,
38 : const PublicKey &peer_pubkey) {
39 3 : return rxcpp::observable<>::create<std::shared_ptr<Block>>(
40 : [this, height, &peer_pubkey](auto subscriber) {
41 3 : auto peer = this->findPeer(peer_pubkey);
42 3 : if (not peer) {
43 0 : log_->error(kPeerNotFound);
44 0 : subscriber.on_completed();
45 0 : return;
46 : }
47 :
48 3 : proto::BlocksRequest request;
49 3 : grpc::ClientContext context;
50 3 : protocol::Block block;
51 :
52 : // request next block to our top
53 3 : request.set_height(height + 1);
54 :
55 : auto reader =
56 3 : this->getPeerStub(**peer).retrieveBlocks(&context, request);
57 6 : while (reader->Read(&block)) {
58 3 : auto proto_block = block_factory_.createBlock(std::move(block));
59 3 : proto_block.match(
60 : [&subscriber](
61 : iroha::expected::Value<std::unique_ptr<Block>> &result) {
62 3 : subscriber.on_next(std::move(result.value));
63 3 : },
64 : [this,
65 : &context](const iroha::expected::Error<std::string> &error) {
66 0 : log_->error(error.error);
67 0 : context.TryCancel();
68 0 : });
69 3 : }
70 3 : reader->Finish();
71 3 : subscriber.on_completed();
72 3 : });
73 0 : }
74 :
75 : boost::optional<std::shared_ptr<Block>> BlockLoaderImpl::retrieveBlock(
76 : const PublicKey &peer_pubkey, const types::HashType &block_hash) {
77 4 : auto peer = findPeer(peer_pubkey);
78 4 : if (not peer) {
79 0 : log_->error(kPeerNotFound);
80 0 : return boost::none;
81 : }
82 :
83 4 : proto::BlockRequest request;
84 4 : grpc::ClientContext context;
85 4 : protocol::Block block;
86 :
87 : // request block with specified hash
88 4 : request.set_hash(toBinaryString(block_hash));
89 :
90 4 : auto status = getPeerStub(**peer).retrieveBlock(&context, request, &block);
91 4 : if (not status.ok()) {
92 1 : log_->warn(status.error_message());
93 1 : return boost::none;
94 : }
95 :
96 3 : auto result = block_factory_.createBlock(std::move(block));
97 :
98 3 : return result.match(
99 : [](iroha::expected::Value<std::unique_ptr<Block>> &v) {
100 3 : return boost::make_optional(std::shared_ptr<Block>(std::move(v.value)));
101 0 : },
102 : [this](const iroha::expected::Error<std::string> &e)
103 : -> boost::optional<std::shared_ptr<Block>> {
104 0 : log_->error(e.error);
105 0 : return boost::none;
106 : });
107 4 : }
108 :
109 : boost::optional<std::shared_ptr<shared_model::interface::Peer>>
110 : BlockLoaderImpl::findPeer(const shared_model::crypto::PublicKey &pubkey) {
111 7 : auto peers = peer_query_factory_->createPeerQuery() |
112 : [](const auto &query) { return query->getLedgerPeers(); };
113 7 : if (not peers) {
114 0 : log_->error(kPeerRetrieveFail);
115 0 : return boost::none;
116 : }
117 :
118 7 : auto &blob = pubkey.blob();
119 7 : auto it = std::find_if(
120 : peers.value().begin(), peers.value().end(), [&blob](const auto &peer) {
121 7 : return peer->pubkey().blob() == blob;
122 : });
123 7 : if (it == peers.value().end()) {
124 0 : log_->error(kPeerFindFail);
125 0 : return boost::none;
126 : }
127 7 : return *it;
128 7 : }
129 :
130 : proto::Loader::Stub &BlockLoaderImpl::getPeerStub(
131 : const shared_model::interface::Peer &peer) {
132 7 : auto it = peer_connections_.find(peer.address());
133 7 : if (it == peer_connections_.end()) {
134 7 : it = peer_connections_
135 7 : .insert(std::make_pair(
136 7 : peer.address(),
137 7 : network::createClient<proto::Loader>(peer.address())))
138 7 : .first;
139 7 : }
140 7 : return *it->second;
141 0 : }
|