Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * Copyright (c) Microsoft Open Technologies, Inc. All rights reserved.
4 : * SPDX-License-Identifier: Apache-2.0
5 : */
6 :
7 : #ifndef IROHA_DELAY_HPP
8 : #define IROHA_DELAY_HPP
9 :
10 : #include <rxcpp/operators/rx-delay.hpp>
11 :
12 : namespace iroha {
13 :
14 : /**
15 : * This class is mostly the same as rxcpp::operators::delay,
16 : * the only change is that it accepts a selector lambda which generates
17 : * a duration based on observable value instead of a fixed duration
18 : * Return an observable that emits each item emitted by the source observable
19 : * after the specified delay
20 : * Delay is generated with selector from the last received value
21 : * @tparam T value type
22 : * @tparam Selector the type of the transforming function
23 : * which returns time interval
24 : * @tparam Coordination the type of the scheduler
25 : */
26 : template <class T, class Selector, class Coordination>
27 : struct delay {
28 : typedef rxcpp::util::decay_t<T> source_value_type;
29 : typedef rxcpp::util::decay_t<Coordination> coordination_type;
30 : typedef typename coordination_type::coordinator_type coordinator_type;
31 : typedef rxcpp::util::decay_t<Selector> select_type;
32 :
33 : struct delay_values {
34 : delay_values(select_type s, coordination_type c)
35 247 : : selector(std::move(s)), coordination(c) {}
36 :
37 : select_type selector;
38 : coordination_type coordination;
39 : };
40 : delay_values initial;
41 :
42 : delay(select_type s, coordination_type coordination)
43 247 : : initial(std::move(s), coordination) {}
44 :
45 : template <class Subscriber>
46 : struct delay_observer {
47 : typedef delay_observer<Subscriber> this_type;
48 : typedef rxcpp::util::decay_t<T> value_type;
49 : typedef rxcpp::util::decay_t<Subscriber> dest_type;
50 : typedef rxcpp::observer<T, this_type> observer_type;
51 :
52 : struct delay_subscriber_values : public delay_values {
53 : delay_subscriber_values(rxcpp::composite_subscription cs,
54 : dest_type d,
55 : delay_values v,
56 : coordinator_type c)
57 247 : : delay_values(v),
58 247 : cs(std::move(cs)),
59 247 : dest(std::move(d)),
60 247 : coordinator(std::move(c)),
61 247 : worker(coordinator.get_worker()),
62 247 : expected(worker.now()) {}
63 :
64 : rxcpp::composite_subscription cs;
65 : dest_type dest;
66 : coordinator_type coordinator;
67 : rxcpp::schedulers::worker worker;
68 : rxcpp::schedulers::scheduler::clock_type::time_point expected;
69 : };
70 : std::shared_ptr<delay_subscriber_values> state;
71 :
72 : delay_observer(rxcpp::composite_subscription cs,
73 : dest_type d,
74 : delay_values v,
75 : coordinator_type c)
76 247 : : state(std::make_shared<delay_subscriber_values>(
77 247 : delay_subscriber_values(
78 247 : std::move(cs), std::move(d), v, std::move(c)))) {
79 247 : auto localState = state;
80 :
81 : auto disposer = [=](const rxcpp::schedulers::schedulable &) {
82 247 : localState->cs.unsubscribe();
83 247 : localState->dest.unsubscribe();
84 247 : localState->worker.unsubscribe();
85 247 : };
86 247 : auto selectedDisposer = on_exception(
87 : [&]() { return localState->coordinator.act(disposer); },
88 247 : localState->dest);
89 247 : if (selectedDisposer.empty()) {
90 0 : return;
91 : }
92 :
93 247 : localState->dest.add(
94 : [=]() { localState->worker.schedule(selectedDisposer.get()); });
95 247 : localState->cs.add(
96 : [=]() { localState->worker.schedule(selectedDisposer.get()); });
97 247 : }
98 :
99 : template <class Value>
100 : void on_next(Value &&v) const {
101 3373 : auto localState = state;
102 :
103 3373 : auto selected = on_exception(
104 : [&]() { return localState->selector(std::forward<Value>(v)); },
105 3373 : localState->dest);
106 3373 : if (selected.empty()) {
107 0 : return;
108 : }
109 :
110 : auto work = [v, localState](const rxcpp::schedulers::schedulable &) {
111 3373 : localState->dest.on_next(v);
112 3373 : };
113 : auto selectedWork =
114 : on_exception([&]() { return localState->coordinator.act(work); },
115 3373 : localState->dest);
116 3373 : if (selectedWork.empty()) {
117 0 : return;
118 : }
119 3373 : localState->worker.schedule(localState->worker.now() + selected.get(),
120 3373 : selectedWork.get());
121 3373 : }
122 :
123 : void on_error(std::exception_ptr e) const {
124 0 : auto localState = state;
125 : auto work = [e, localState](const rxcpp::schedulers::schedulable &) {
126 0 : localState->dest.on_error(e);
127 0 : };
128 : auto selectedWork =
129 : on_exception([&]() { return localState->coordinator.act(work); },
130 0 : localState->dest);
131 0 : if (selectedWork.empty()) {
132 0 : return;
133 : }
134 0 : localState->worker.schedule(selectedWork.get());
135 0 : }
136 :
137 : void on_completed() const {
138 0 : auto localState = state;
139 : auto work = [localState](const rxcpp::schedulers::schedulable &) {
140 0 : localState->dest.on_completed();
141 0 : };
142 : auto selectedWork =
143 : on_exception([&]() { return localState->coordinator.act(work); },
144 0 : localState->dest);
145 0 : if (selectedWork.empty()) {
146 0 : return;
147 : }
148 0 : localState->worker.schedule(selectedWork.get());
149 0 : }
150 :
151 : static rxcpp::subscriber<T, observer_type> make(dest_type d,
152 : delay_values v) {
153 247 : auto cs = rxcpp::composite_subscription();
154 247 : auto coordinator = v.coordination.create_coordinator();
155 :
156 247 : return rxcpp::make_subscriber<T>(
157 : cs,
158 247 : observer_type(this_type(
159 247 : cs, std::move(d), std::move(v), std::move(coordinator))));
160 247 : }
161 : };
162 :
163 : template <class Subscriber>
164 : auto operator()(Subscriber dest) const
165 : -> decltype(delay_observer<Subscriber>::make(std::move(dest),
166 : initial)) {
167 247 : return delay_observer<Subscriber>::make(std::move(dest), initial);
168 0 : }
169 : };
170 :
171 : template <typename T,
172 : typename Selector,
173 : typename Coordination,
174 : class ResolvedSelector = rxcpp::util::decay_t<Selector>,
175 : class Duration = decltype(std::declval<ResolvedSelector>()(
176 : (std::declval<std::decay_t<T>>()))),
177 : class Enabled = rxcpp::util::enable_if_all_true_type_t<
178 : rxcpp::is_coordination<Coordination>,
179 : rxcpp::util::is_duration<Duration>>,
180 : class Delay =
181 : delay<T, ResolvedSelector, rxcpp::util::decay_t<Coordination>>>
182 : static auto makeDelay(Selector &&s, Coordination &&cn) {
183 247 : return Delay(std::forward<Selector>(s), std::forward<Coordination>(cn));
184 0 : }
185 :
186 : } // namespace iroha
187 :
188 : #endif // IROHA_DELAY_HPP
|