LCOV - code coverage report
Current view: top level - irohad/consensus/yac/impl - yac.cpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 92 93 98.9 %
Date: 2019-03-07 14:46:43 Functions: 27 27 100.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "consensus/yac/yac.hpp"
       7             : 
       8             : #include <utility>
       9             : 
      10             : #include "common/bind.hpp"
      11             : #include "common/visitor.hpp"
      12             : #include "consensus/yac/cluster_order.hpp"
      13             : #include "consensus/yac/storage/yac_proposal_storage.hpp"
      14             : #include "consensus/yac/timer.hpp"
      15             : #include "consensus/yac/yac_crypto_provider.hpp"
      16             : #include "cryptography/public_key.hpp"
      17             : #include "cryptography/signed.hpp"
      18             : #include "interfaces/common_objects/peer.hpp"
      19             : #include "logger/logger.hpp"
      20             : 
      21             : namespace iroha {
      22             :   namespace consensus {
      23             :     namespace yac {
      24             : 
      25             :       template <typename T>
      26             :       static std::string cryptoError(const T &votes) {
      27           4 :         std::string result =
      28           4 :             "Crypto verification failed for message.\n Votes: ";
      29             :         result += logger::to_string(votes, [](const auto &vote) {
      30           4 :           std::string result = "(Public key: ";
      31           4 :           result += vote.signature->publicKey().hex();
      32           4 :           result += ", Signature: ";
      33           4 :           result += vote.signature->signedData().hex();
      34           4 :           result += ")\n";
      35           4 :           return result;
      36           4 :         });
      37           4 :         return result;
      38           4 :       }
      39             : 
      40             :       std::shared_ptr<Yac> Yac::create(
      41             :           YacVoteStorage vote_storage,
      42             :           std::shared_ptr<YacNetwork> network,
      43             :           std::shared_ptr<YacCryptoProvider> crypto,
      44             :           std::shared_ptr<Timer> timer,
      45             :           ClusterOrdering order,
      46             :           logger::LoggerPtr log) {
      47         273 :         return std::make_shared<Yac>(
      48         273 :             vote_storage, network, crypto, timer, order, std::move(log));
      49             :       }
      50             : 
      51             :       Yac::Yac(YacVoteStorage vote_storage,
      52             :                std::shared_ptr<YacNetwork> network,
      53             :                std::shared_ptr<YacCryptoProvider> crypto,
      54             :                std::shared_ptr<Timer> timer,
      55             :                ClusterOrdering order,
      56             :                logger::LoggerPtr log)
      57         273 :           : vote_storage_(std::move(vote_storage)),
      58         273 :             network_(std::move(network)),
      59         273 :             crypto_(std::move(crypto)),
      60         273 :             timer_(std::move(timer)),
      61         273 :             cluster_order_(order),
      62         273 :             log_(std::move(log)) {}
      63             : 
      64             :       // ------|Hash gate|------
      65             : 
      66             :       void Yac::vote(YacHash hash, ClusterOrdering order) {
      67        3140 :         log_->info("Order for voting: {}",
      68        3140 :                    logger::to_string(order.getPeers(),
      69             :                                      [](auto val) { return val->address(); }));
      70             : 
      71        3140 :         cluster_order_ = order;
      72        3140 :         auto vote = crypto_->getVote(hash);
      73             :         // TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
      74             :         // separate entity
      75        3140 :         votingStep(vote);
      76        3140 :       }
      77             : 
      78             :       rxcpp::observable<Answer> Yac::onOutcome() {
      79         504 :         return notifier_.get_observable();
      80           0 :       }
      81             : 
      82             :       // ------|Network notifications|------
      83             : 
      84             :       void Yac::onState(std::vector<VoteMessage> state) {
      85        6738 :         std::lock_guard<std::mutex> guard(mutex_);
      86        6739 :         if (crypto_->verify(state)) {
      87        6108 :           applyState(state);
      88        6735 :         } else {
      89           4 :           log_->warn("{}", cryptoError(state));
      90             :         }
      91        6739 :       }
      92             : 
      93             :       // ------|Private interface|------
      94             : 
      95             :       void Yac::votingStep(VoteMessage vote) {
      96        3588 :         auto committed = vote_storage_.isCommitted(vote.hash.vote_round);
      97        3588 :         if (committed) {
      98           1 :           return;
      99             :         }
     100             : 
     101        3587 :         const auto &current_leader = cluster_order_.currentLeader();
     102             : 
     103        3587 :         log_->info("Vote for round {}, hash ({}, {}) to peer {}",
     104        3587 :                    vote.hash.vote_round,
     105        3587 :                    vote.hash.vote_hashes.proposal_hash,
     106        3587 :                    vote.hash.vote_hashes.block_hash,
     107        3587 :                    current_leader);
     108             : 
     109        3587 :         network_->sendState(current_leader, {vote});
     110        3587 :         cluster_order_.switchToNext();
     111        3587 :         if (cluster_order_.hasNext()) {
     112             :           timer_->invokeAfterDelay([this, vote] { this->votingStep(vote); });
     113         448 :         }
     114        3588 :       }
     115             : 
     116             :       void Yac::closeRound() {
     117        3140 :         timer_->deny();
     118        3140 :       }
     119             : 
     120             :       boost::optional<std::shared_ptr<shared_model::interface::Peer>>
     121             :       Yac::findPeer(const VoteMessage &vote) {
     122           3 :         auto peers = cluster_order_.getPeers();
     123             :         auto it =
     124             :             std::find_if(peers.begin(), peers.end(), [&](const auto &peer) {
     125          17 :               return peer->pubkey() == vote.signature->publicKey();
     126             :             });
     127           3 :         return it != peers.end() ? boost::make_optional(std::move(*it))
     128           2 :                                  : boost::none;
     129           3 :       }
     130             : 
     131             :       // ------|Apply data|------
     132             : 
     133             :       void Yac::applyState(const std::vector<VoteMessage> &state) {
     134             :         auto answer =
     135        6735 :             vote_storage_.store(state, cluster_order_.getNumberOfPeers());
     136             : 
     137             :         // TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
     138             :         // separate entity
     139             : 
     140             :         answer | [&](const auto &answer) {
     141        6735 :           auto &proposal_round = state.at(0).hash.vote_round;
     142             : 
     143             :           /*
     144             :            * It is possible that a new peer with an outdated peers list may
     145             :            * collect an outcome from a smaller number of peers which are
     146             :            * included in set of `f` peers in the system. The new peer will not
     147             :            * accept our message with valid supermajority because he cannot apply
     148             :            * votes from unknown peers.
     149             :            */
     150        6281 :           if (state.size() > 1) {
     151             :             // some peer has already collected commit/reject, so it is sent
     152         439 :             if (vote_storage_.getProcessingState(proposal_round)
     153         439 :                 == ProposalState::kNotSentNotProcessed) {
     154           6 :               vote_storage_.nextProcessingState(proposal_round);
     155           6 :               log_->info(
     156           6 :                   "Received supermajority of votes for {}, skip propagation",
     157           6 :                   proposal_round);
     158           6 :             }
     159         439 :           }
     160             : 
     161        6283 :           auto processing_state =
     162        6283 :               vote_storage_.getProcessingState(proposal_round);
     163             : 
     164             :           auto votes = [](const auto &state) { return state.votes; };
     165             : 
     166        6283 :           switch (processing_state) {
     167             :             case ProposalState::kNotSentNotProcessed:
     168        3136 :               vote_storage_.nextProcessingState(proposal_round);
     169        3136 :               log_->info("Propagate state {} to whole network", proposal_round);
     170        3136 :               this->propagateState(visit_in_place(answer, votes));
     171        3136 :               break;
     172             :             case ProposalState::kSentNotProcessed:
     173        3140 :               vote_storage_.nextProcessingState(proposal_round);
     174        3140 :               log_->info("Pass outcome for {} to pipeline", proposal_round);
     175        3140 :               this->closeRound();
     176        3140 :               notifier_.get_subscriber().on_next(answer);
     177        3140 :               break;
     178             :             case ProposalState::kSentProcessed:
     179           5 :               if (state.size() == 1) {
     180             :                 this->findPeer(state.at(0)) | [&](const auto &from) {
     181           1 :                   log_->info("Propagate state {} directly to {}",
     182           3 :                              proposal_round,
     183           1 :                              from->address());
     184           1 :                   this->propagateStateDirectly(*from,
     185           3 :                                                visit_in_place(answer, votes));
     186           1 :                 };
     187           3 :               }
     188           5 :               break;
     189             :           }
     190        6281 :         };
     191        6735 :       }
     192             : 
     193             :       // ------|Propagation|------
     194             : 
     195             :       void Yac::propagateState(const std::vector<VoteMessage> &msg) {
     196        6719 :         for (const auto &peer : cluster_order_.getPeers()) {
     197        3583 :           propagateStateDirectly(*peer, msg);
     198             :         }
     199        3136 :       }
     200             : 
     201             :       void Yac::propagateStateDirectly(const shared_model::interface::Peer &to,
     202             :                                        const std::vector<VoteMessage> &msg) {
     203        3584 :         network_->sendState(to, msg);
     204        3584 :       }
     205             : 
     206             :     }  // namespace yac
     207             :   }    // namespace consensus
     208             : }  // namespace iroha

Generated by: LCOV version 1.13