LCOV - code coverage report
Current view: top level - irohad/consensus/yac/transport/impl - network_impl.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 30 34 88.2 %
Date: 2019-03-07 14:46:43 Functions: 7 7 100.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "consensus/yac/transport/impl/network_impl.hpp"
       7             : 
       8             : #include <grpc++/grpc++.h>
       9             : #include <memory>
      10             : 
      11             : #include "consensus/yac/storage/yac_common.hpp"
      12             : #include "consensus/yac/transport/yac_pb_converters.hpp"
      13             : #include "consensus/yac/vote_message.hpp"
      14             : #include "interfaces/common_objects/peer.hpp"
      15             : #include "logger/logger.hpp"
      16             : #include "network/impl/grpc_channel_builder.hpp"
      17             : #include "yac.pb.h"
      18             : 
      19             : namespace iroha {
      20             :   namespace consensus {
      21             :     namespace yac {
      22             :       // ----------| Public API |----------
      23             : 
      24             :       NetworkImpl::NetworkImpl(
      25             :           std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
      26             :               async_call,
      27             :           logger::LoggerPtr log)
      28         498 :           : async_call_(async_call), log_(std::move(log)) {}
      29             : 
      30             :       void NetworkImpl::subscribe(
      31             :           std::shared_ptr<YacNetworkNotifications> handler) {
      32         250 :         handler_ = handler;
      33         250 :       }
      34             : 
      35             :       void NetworkImpl::sendState(const shared_model::interface::Peer &to,
      36             :                                   const std::vector<VoteMessage> &state) {
      37        7555 :         createPeerConnection(to);
      38             : 
      39        7555 :         proto::State request;
      40       15970 :         for (const auto &vote : state) {
      41        8415 :           auto pb_vote = request.add_votes();
      42        8415 :           *pb_vote = PbConverters::serializeVote(vote);
      43             :         }
      44             : 
      45             :         async_call_->Call([&](auto context, auto cq) {
      46        7555 :           return peers_.at(to.address())->AsyncSendState(context, request, cq);
      47             :         });
      48             : 
      49        7555 :         log_->info(
      50        7555 :             "Send votes bundle[size={}] to {}", state.size(), to.address());
      51        7555 :       }
      52             : 
      53             :       grpc::Status NetworkImpl::SendState(
      54             :           ::grpc::ServerContext *context,
      55             :           const ::iroha::consensus::yac::proto::State *request,
      56             :           ::google::protobuf::Empty *response) {
      57        7548 :         std::vector<VoteMessage> state;
      58       15964 :         for (const auto &pb_vote : request->votes()) {
      59        8402 :           auto vote = *PbConverters::deserializeVote(pb_vote, log_);
      60        8350 :           state.push_back(vote);
      61        8409 :         }
      62        7553 :         if (not sameKeys(state)) {
      63           0 :           log_->info(
      64           0 :               "Votes are stateless invalid: proposals are different, or empty "
      65             :               "collection");
      66           0 :           return grpc::Status::CANCELLED;
      67             :         }
      68             : 
      69        7553 :         log_->info(
      70        7533 :             "Received votes[size={}] from {}", state.size(), context->peer());
      71             : 
      72        7553 :         if (auto notifications = handler_.lock()) {
      73        7553 :           notifications->onState(std::move(state));
      74        7553 :         } else {
      75           0 :           log_->error("Unable to lock the subscriber");
      76             :         }
      77        7553 :         return grpc::Status::OK;
      78        7553 :       }
      79             : 
      80             :       void NetworkImpl::createPeerConnection(
      81             :           const shared_model::interface::Peer &peer) {
      82        7555 :         if (peers_.count(peer.address()) == 0) {
      83         251 :           peers_[peer.address()] =
      84         250 :               network::createClient<proto::Yac>(peer.address());
      85         251 :         }
      86        7555 :       }
      87             : 
      88             :     }  // namespace yac
      89             :   }    // namespace consensus
      90             : }  // namespace iroha

Generated by: LCOV version 1.13