LCOV - code coverage report
Current view: top level - libs/common - combine_latest_until_first_completed.hpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 45 49 91.8 %
Date: 2019-03-07 14:46:43 Functions: 345 362 95.3 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * Copyright (c) Microsoft Open Technologies, Inc. All rights reserved.
       4             :  * SPDX-License-Identifier: Apache-2.0
       5             :  */
       6             : 
       7             : #ifndef IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP
       8             : #define IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP
       9             : 
      10             : #include <rxcpp/operators/rx-combine_latest.hpp>
      11             : 
      12             : namespace iroha {
      13             : 
      14             :   /**
      15             :    * This class is mostly the same as rxcpp::operators::combine_latest,
      16             :    * the only change is that it completes when the first of observables is
      17             :    * completed instead of all observables
      18             :    * For each item from all of the observables select a value to emit from the
      19             :    * new observable that is returned
      20             :    * @tparam Coordination the type of the scheduler
      21             :    * @tparam Selector the type of the aggregation function
      22             :    * @tparam ObservableN types of source observables
      23             :    */
      24             :   template <class Coordination, class Selector, class... ObservableN>
      25             :   struct combine_latest_until_first_completed
      26             :       : public rxcpp::operators::operator_base<rxcpp::util::value_type_t<
      27             :             rxcpp::operators::detail::combine_latest_traits<Coordination,
      28             :                                                             Selector,
      29             :                                                             ObservableN...>>> {
      30             :     typedef combine_latest_until_first_completed<Coordination,
      31             :                                                  Selector,
      32             :                                                  ObservableN...>
      33             :         this_type;
      34             : 
      35             :     typedef rxcpp::operators::detail::
      36             :         combine_latest_traits<Coordination, Selector, ObservableN...>
      37             :             traits;
      38             : 
      39             :     typedef typename traits::tuple_source_type tuple_source_type;
      40             :     typedef typename traits::tuple_source_value_type tuple_source_value_type;
      41             : 
      42             :     typedef typename traits::selector_type selector_type;
      43             : 
      44             :     typedef typename traits::coordination_type coordination_type;
      45             :     typedef typename coordination_type::coordinator_type coordinator_type;
      46             : 
      47             :     struct values {
      48             :       values(tuple_source_type o, selector_type s, coordination_type sf)
      49           4 :           : source(std::move(o)),
      50           4 :             selector(std::move(s)),
      51           4 :             coordination(std::move(sf)) {}
      52             :       tuple_source_type source;
      53             :       selector_type selector;
      54             :       coordination_type coordination;
      55             :     };
      56             :     values initial;
      57             : 
      58             :     combine_latest_until_first_completed(coordination_type sf,
      59             :                                          selector_type s,
      60             :                                          tuple_source_type ts)
      61           4 :         : initial(std::move(ts), std::move(s), std::move(sf)) {}
      62             : 
      63             :     template <int Index, class State>
      64             :     void subscribe_one(std::shared_ptr<State> state) const {
      65             :       typedef typename std::tuple_element<Index,
      66             :                                           tuple_source_type>::type::value_type
      67             :           source_value_type;
      68             : 
      69           4 :       rxcpp::composite_subscription innercs;
      70             : 
      71             :       // when the out observer is unsubscribed all the
      72             :       // inner subscriptions are unsubscribed as well
      73           4 :       state->out.add(innercs);
      74             : 
      75           4 :       auto source = on_exception(
      76             :           [&]() {
      77           4 :             return state->coordinator.in(std::get<Index>(state->source));
      78           0 :           },
      79           4 :           state->out);
      80           4 :       if (source.empty()) {
      81           0 :         return;
      82             :       }
      83             : 
      84             :       // this subscribe does not share the observer subscription
      85             :       // so that when it is unsubscribed the observer can be called
      86             :       // until the inner subscriptions have finished
      87           4 :       auto sink = rxcpp::make_subscriber<source_value_type>(
      88           4 :           state->out,
      89             :           innercs,
      90             :           // on_next
      91             :           [state](source_value_type st) {
      92          33 :             auto &value = std::get<Index>(state->latest);
      93             : 
      94          33 :             if (value.empty()) {
      95           3 :               ++state->valuesSet;
      96           3 :             }
      97             : 
      98          33 :             value.reset(st);
      99             : 
     100          33 :             if (state->valuesSet == sizeof...(ObservableN)) {
     101          30 :               auto values = rxcpp::util::surely(state->latest);
     102          30 :               auto selectedResult = rxcpp::util::apply(values, state->selector);
     103          30 :               state->out.on_next(selectedResult);
     104          30 :             }
     105          33 :           },
     106             :           // on_error
     107             :           [state](std::exception_ptr e) { state->out.on_error(e); },
     108             :           // on_completed
     109             :           [state]() { state->out.on_completed(); });
     110           4 :       auto selectedSink = on_exception(
     111             :           [&]() { return state->coordinator.out(sink); }, state->out);
     112           4 :       if (selectedSink.empty()) {
     113           0 :         return;
     114             :       }
     115           4 :       source->subscribe(std::move(selectedSink.get()));
     116           4 :     }
     117             : 
     118             :     template <class State, int... IndexN>
     119             :     void subscribe_all(std::shared_ptr<State> state,
     120             :                        rxcpp::util::values<int, IndexN...>) const {
     121           4 :       bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
     122           4 :       subscribed[0] = (*subscribed);  // silence warning
     123           4 :     }
     124             : 
     125             :     template <class Subscriber>
     126             :     void on_subscribe(Subscriber scbr) const {
     127             :       static_assert(rxcpp::is_subscriber<Subscriber>::value,
     128             :                     "subscribe must be passed a subscriber");
     129             : 
     130             :       typedef Subscriber output_type;
     131             : 
     132             :       struct combine_latest_until_first_completed_state_type
     133             :           : public std::enable_shared_from_this<
     134             :                 combine_latest_until_first_completed_state_type>,
     135             :             public values {
     136             :         combine_latest_until_first_completed_state_type(values i,
     137             :                                                         coordinator_type coor,
     138             :                                                         output_type oarg)
     139           4 :             : values(std::move(i)),
     140           4 :               valuesSet(0),
     141           4 :               coordinator(std::move(coor)),
     142           4 :               out(std::move(oarg)) {}
     143             : 
     144             :         mutable int valuesSet;
     145             :         mutable tuple_source_value_type latest;
     146             :         coordinator_type coordinator;
     147             :         output_type out;
     148             :       };
     149             : 
     150             :       auto coordinator =
     151           4 :           initial.coordination.create_coordinator(scbr.get_subscription());
     152             : 
     153             :       // take a copy of the values for each subscription
     154             :       auto state =
     155           4 :           std::make_shared<combine_latest_until_first_completed_state_type>(
     156           4 :               initial, std::move(coordinator), std::move(scbr));
     157             : 
     158           4 :       subscribe_all(
     159           4 :           state,
     160             :           typename rxcpp::util::values_from<int,
     161             :                                             sizeof...(ObservableN)>::type());
     162           4 :     }
     163             :   };
     164             : 
     165             :   template <
     166             :       class Coordination,
     167             :       class Selector,
     168             :       class Observable,
     169             :       class... ObservableN,
     170             :       class Enabled = rxcpp::util::enable_if_all_true_type_t<
     171             :           rxcpp::is_coordination<Coordination>,
     172             :           rxcpp::operators::detail::
     173             :               is_combine_latest_selector<Selector, Observable, ObservableN...>,
     174             :           rxcpp::all_observables<Observable, ObservableN...>>,
     175             :       class ResolvedSelector = rxcpp::util::decay_t<Selector>,
     176             :       class combine_latest = combine_latest_until_first_completed<
     177             :           Coordination,
     178             :           ResolvedSelector,
     179             :           rxcpp::util::decay_t<Observable>,
     180             :           rxcpp::util::decay_t<ObservableN>...>,
     181             :       class Value = rxcpp::util::value_type_t<combine_latest>,
     182             :       class Result = rxcpp::observable<Value, combine_latest>>
     183             :   static Result makeCombineLatestUntilFirstCompleted(Observable &&o,
     184             :                                                      Coordination &&cn,
     185             :                                                      Selector &&s,
     186             :                                                      ObservableN &&... on) {
     187           4 :     return Result(
     188           4 :         combine_latest(std::forward<Coordination>(cn),
     189           4 :                        std::forward<Selector>(s),
     190           4 :                        std::make_tuple(std::forward<Observable>(o),
     191           4 :                                        std::forward<ObservableN>(on)...)));
     192           0 :   }
     193             : 
     194             : }  // namespace iroha
     195             : 
     196             : #endif  // IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP

Generated by: LCOV version 1.13