LCOV - code coverage report
Current view: top level - libs/common - delay.hpp (source / functions) Hit Total Coverage
Test: cleared_cor.info Lines: 41 62 66.1 %
Date: 2019-03-07 14:46:43 Functions: 61 79 77.2 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * Copyright (c) Microsoft Open Technologies, Inc. All rights reserved.
       4             :  * SPDX-License-Identifier: Apache-2.0
       5             :  */
       6             : 
       7             : #ifndef IROHA_DELAY_HPP
       8             : #define IROHA_DELAY_HPP
       9             : 
      10             : #include <rxcpp/operators/rx-delay.hpp>
      11             : 
      12             : namespace iroha {
      13             : 
      14             :   /**
      15             :    * This class is mostly the same as rxcpp::operators::delay,
      16             :    * the only change is that it accepts a selector lambda which generates
      17             :    * a duration based on observable value instead of a fixed duration
      18             :    * Return an observable that emits each item emitted by the source observable
      19             :    * after the specified delay
      20             :    * Delay is generated with selector from the last received value
      21             :    * @tparam T value type
      22             :    * @tparam Selector the type of the transforming function
      23             :    * which returns time interval
      24             :    * @tparam Coordination the type of the scheduler
      25             :    */
      26             :   template <class T, class Selector, class Coordination>
      27             :   struct delay {
      28             :     typedef rxcpp::util::decay_t<T> source_value_type;
      29             :     typedef rxcpp::util::decay_t<Coordination> coordination_type;
      30             :     typedef typename coordination_type::coordinator_type coordinator_type;
      31             :     typedef rxcpp::util::decay_t<Selector> select_type;
      32             : 
      33             :     struct delay_values {
      34             :       delay_values(select_type s, coordination_type c)
      35         247 :           : selector(std::move(s)), coordination(c) {}
      36             : 
      37             :       select_type selector;
      38             :       coordination_type coordination;
      39             :     };
      40             :     delay_values initial;
      41             : 
      42             :     delay(select_type s, coordination_type coordination)
      43         247 :         : initial(std::move(s), coordination) {}
      44             : 
      45             :     template <class Subscriber>
      46             :     struct delay_observer {
      47             :       typedef delay_observer<Subscriber> this_type;
      48             :       typedef rxcpp::util::decay_t<T> value_type;
      49             :       typedef rxcpp::util::decay_t<Subscriber> dest_type;
      50             :       typedef rxcpp::observer<T, this_type> observer_type;
      51             : 
      52             :       struct delay_subscriber_values : public delay_values {
      53             :         delay_subscriber_values(rxcpp::composite_subscription cs,
      54             :                                 dest_type d,
      55             :                                 delay_values v,
      56             :                                 coordinator_type c)
      57         247 :             : delay_values(v),
      58         247 :               cs(std::move(cs)),
      59         247 :               dest(std::move(d)),
      60         247 :               coordinator(std::move(c)),
      61         247 :               worker(coordinator.get_worker()),
      62         247 :               expected(worker.now()) {}
      63             : 
      64             :         rxcpp::composite_subscription cs;
      65             :         dest_type dest;
      66             :         coordinator_type coordinator;
      67             :         rxcpp::schedulers::worker worker;
      68             :         rxcpp::schedulers::scheduler::clock_type::time_point expected;
      69             :       };
      70             :       std::shared_ptr<delay_subscriber_values> state;
      71             : 
      72             :       delay_observer(rxcpp::composite_subscription cs,
      73             :                      dest_type d,
      74             :                      delay_values v,
      75             :                      coordinator_type c)
      76         247 :           : state(std::make_shared<delay_subscriber_values>(
      77         247 :                 delay_subscriber_values(
      78         247 :                     std::move(cs), std::move(d), v, std::move(c)))) {
      79         247 :         auto localState = state;
      80             : 
      81             :         auto disposer = [=](const rxcpp::schedulers::schedulable &) {
      82         247 :           localState->cs.unsubscribe();
      83         247 :           localState->dest.unsubscribe();
      84         247 :           localState->worker.unsubscribe();
      85         247 :         };
      86         247 :         auto selectedDisposer = on_exception(
      87             :             [&]() { return localState->coordinator.act(disposer); },
      88         247 :             localState->dest);
      89         247 :         if (selectedDisposer.empty()) {
      90           0 :           return;
      91             :         }
      92             : 
      93         247 :         localState->dest.add(
      94             :             [=]() { localState->worker.schedule(selectedDisposer.get()); });
      95         247 :         localState->cs.add(
      96             :             [=]() { localState->worker.schedule(selectedDisposer.get()); });
      97         247 :       }
      98             : 
      99             :       template <class Value>
     100             :       void on_next(Value &&v) const {
     101        3373 :         auto localState = state;
     102             : 
     103        3373 :         auto selected = on_exception(
     104             :             [&]() { return localState->selector(std::forward<Value>(v)); },
     105        3373 :             localState->dest);
     106        3373 :         if (selected.empty()) {
     107           0 :           return;
     108             :         }
     109             : 
     110             :         auto work = [v, localState](const rxcpp::schedulers::schedulable &) {
     111        3373 :           localState->dest.on_next(v);
     112        3373 :         };
     113             :         auto selectedWork =
     114             :             on_exception([&]() { return localState->coordinator.act(work); },
     115        3373 :                          localState->dest);
     116        3373 :         if (selectedWork.empty()) {
     117           0 :           return;
     118             :         }
     119        3373 :         localState->worker.schedule(localState->worker.now() + selected.get(),
     120        3373 :                                     selectedWork.get());
     121        3373 :       }
     122             : 
     123             :       void on_error(std::exception_ptr e) const {
     124           0 :         auto localState = state;
     125             :         auto work = [e, localState](const rxcpp::schedulers::schedulable &) {
     126           0 :           localState->dest.on_error(e);
     127           0 :         };
     128             :         auto selectedWork =
     129             :             on_exception([&]() { return localState->coordinator.act(work); },
     130           0 :                          localState->dest);
     131           0 :         if (selectedWork.empty()) {
     132           0 :           return;
     133             :         }
     134           0 :         localState->worker.schedule(selectedWork.get());
     135           0 :       }
     136             : 
     137             :       void on_completed() const {
     138           0 :         auto localState = state;
     139             :         auto work = [localState](const rxcpp::schedulers::schedulable &) {
     140           0 :           localState->dest.on_completed();
     141           0 :         };
     142             :         auto selectedWork =
     143             :             on_exception([&]() { return localState->coordinator.act(work); },
     144           0 :                          localState->dest);
     145           0 :         if (selectedWork.empty()) {
     146           0 :           return;
     147             :         }
     148           0 :         localState->worker.schedule(selectedWork.get());
     149           0 :       }
     150             : 
     151             :       static rxcpp::subscriber<T, observer_type> make(dest_type d,
     152             :                                                       delay_values v) {
     153         247 :         auto cs = rxcpp::composite_subscription();
     154         247 :         auto coordinator = v.coordination.create_coordinator();
     155             : 
     156         247 :         return rxcpp::make_subscriber<T>(
     157             :             cs,
     158         247 :             observer_type(this_type(
     159         247 :                 cs, std::move(d), std::move(v), std::move(coordinator))));
     160         247 :       }
     161             :     };
     162             : 
     163             :     template <class Subscriber>
     164             :     auto operator()(Subscriber dest) const
     165             :         -> decltype(delay_observer<Subscriber>::make(std::move(dest),
     166             :                                                      initial)) {
     167         247 :       return delay_observer<Subscriber>::make(std::move(dest), initial);
     168           0 :     }
     169             :   };
     170             : 
     171             :   template <typename T,
     172             :             typename Selector,
     173             :             typename Coordination,
     174             :             class ResolvedSelector = rxcpp::util::decay_t<Selector>,
     175             :             class Duration = decltype(std::declval<ResolvedSelector>()(
     176             :                 (std::declval<std::decay_t<T>>()))),
     177             :             class Enabled = rxcpp::util::enable_if_all_true_type_t<
     178             :                 rxcpp::is_coordination<Coordination>,
     179             :                 rxcpp::util::is_duration<Duration>>,
     180             :             class Delay =
     181             :                 delay<T, ResolvedSelector, rxcpp::util::decay_t<Coordination>>>
     182             :   static auto makeDelay(Selector &&s, Coordination &&cn) {
     183         247 :     return Delay(std::forward<Selector>(s), std::forward<Coordination>(cn));
     184           0 :   }
     185             : 
     186             : }  // namespace iroha
     187             : 
     188             : #endif  // IROHA_DELAY_HPP

Generated by: LCOV version 1.13