Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "consensus/yac/transport/impl/network_impl.hpp"
7 :
8 : #include <grpc++/grpc++.h>
9 : #include <memory>
10 :
11 : #include "consensus/yac/storage/yac_common.hpp"
12 : #include "consensus/yac/transport/yac_pb_converters.hpp"
13 : #include "consensus/yac/vote_message.hpp"
14 : #include "interfaces/common_objects/peer.hpp"
15 : #include "logger/logger.hpp"
16 : #include "network/impl/grpc_channel_builder.hpp"
17 : #include "yac.pb.h"
18 :
19 : namespace iroha {
20 : namespace consensus {
21 : namespace yac {
22 : // ----------| Public API |----------
23 :
24 : NetworkImpl::NetworkImpl(
25 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
26 : async_call,
27 : logger::LoggerPtr log)
28 498 : : async_call_(async_call), log_(std::move(log)) {}
29 :
30 : void NetworkImpl::subscribe(
31 : std::shared_ptr<YacNetworkNotifications> handler) {
32 250 : handler_ = handler;
33 250 : }
34 :
35 : void NetworkImpl::sendState(const shared_model::interface::Peer &to,
36 : const std::vector<VoteMessage> &state) {
37 7555 : createPeerConnection(to);
38 :
39 7555 : proto::State request;
40 15970 : for (const auto &vote : state) {
41 8415 : auto pb_vote = request.add_votes();
42 8415 : *pb_vote = PbConverters::serializeVote(vote);
43 : }
44 :
45 : async_call_->Call([&](auto context, auto cq) {
46 7555 : return peers_.at(to.address())->AsyncSendState(context, request, cq);
47 : });
48 :
49 7555 : log_->info(
50 7555 : "Send votes bundle[size={}] to {}", state.size(), to.address());
51 7555 : }
52 :
53 : grpc::Status NetworkImpl::SendState(
54 : ::grpc::ServerContext *context,
55 : const ::iroha::consensus::yac::proto::State *request,
56 : ::google::protobuf::Empty *response) {
57 7548 : std::vector<VoteMessage> state;
58 15964 : for (const auto &pb_vote : request->votes()) {
59 8402 : auto vote = *PbConverters::deserializeVote(pb_vote, log_);
60 8350 : state.push_back(vote);
61 8409 : }
62 7553 : if (not sameKeys(state)) {
63 0 : log_->info(
64 0 : "Votes are stateless invalid: proposals are different, or empty "
65 : "collection");
66 0 : return grpc::Status::CANCELLED;
67 : }
68 :
69 7553 : log_->info(
70 7533 : "Received votes[size={}] from {}", state.size(), context->peer());
71 :
72 7553 : if (auto notifications = handler_.lock()) {
73 7553 : notifications->onState(std::move(state));
74 7553 : } else {
75 0 : log_->error("Unable to lock the subscriber");
76 : }
77 7553 : return grpc::Status::OK;
78 7553 : }
79 :
80 : void NetworkImpl::createPeerConnection(
81 : const shared_model::interface::Peer &peer) {
82 7555 : if (peers_.count(peer.address()) == 0) {
83 251 : peers_[peer.address()] =
84 250 : network::createClient<proto::Yac>(peer.address());
85 251 : }
86 7555 : }
87 :
88 : } // namespace yac
89 : } // namespace consensus
90 : } // namespace iroha
|