Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * Copyright (c) Microsoft Open Technologies, Inc. All rights reserved.
4 : * SPDX-License-Identifier: Apache-2.0
5 : */
6 :
7 : #ifndef IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP
8 : #define IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP
9 :
10 : #include <rxcpp/operators/rx-combine_latest.hpp>
11 :
12 : namespace iroha {
13 :
14 : /**
15 : * This class is mostly the same as rxcpp::operators::combine_latest,
16 : * the only change is that it completes when the first of observables is
17 : * completed instead of all observables
18 : * For each item from all of the observables select a value to emit from the
19 : * new observable that is returned
20 : * @tparam Coordination the type of the scheduler
21 : * @tparam Selector the type of the aggregation function
22 : * @tparam ObservableN types of source observables
23 : */
24 : template <class Coordination, class Selector, class... ObservableN>
25 : struct combine_latest_until_first_completed
26 : : public rxcpp::operators::operator_base<rxcpp::util::value_type_t<
27 : rxcpp::operators::detail::combine_latest_traits<Coordination,
28 : Selector,
29 : ObservableN...>>> {
30 : typedef combine_latest_until_first_completed<Coordination,
31 : Selector,
32 : ObservableN...>
33 : this_type;
34 :
35 : typedef rxcpp::operators::detail::
36 : combine_latest_traits<Coordination, Selector, ObservableN...>
37 : traits;
38 :
39 : typedef typename traits::tuple_source_type tuple_source_type;
40 : typedef typename traits::tuple_source_value_type tuple_source_value_type;
41 :
42 : typedef typename traits::selector_type selector_type;
43 :
44 : typedef typename traits::coordination_type coordination_type;
45 : typedef typename coordination_type::coordinator_type coordinator_type;
46 :
47 : struct values {
48 : values(tuple_source_type o, selector_type s, coordination_type sf)
49 4 : : source(std::move(o)),
50 4 : selector(std::move(s)),
51 4 : coordination(std::move(sf)) {}
52 : tuple_source_type source;
53 : selector_type selector;
54 : coordination_type coordination;
55 : };
56 : values initial;
57 :
58 : combine_latest_until_first_completed(coordination_type sf,
59 : selector_type s,
60 : tuple_source_type ts)
61 4 : : initial(std::move(ts), std::move(s), std::move(sf)) {}
62 :
63 : template <int Index, class State>
64 : void subscribe_one(std::shared_ptr<State> state) const {
65 : typedef typename std::tuple_element<Index,
66 : tuple_source_type>::type::value_type
67 : source_value_type;
68 :
69 4 : rxcpp::composite_subscription innercs;
70 :
71 : // when the out observer is unsubscribed all the
72 : // inner subscriptions are unsubscribed as well
73 4 : state->out.add(innercs);
74 :
75 4 : auto source = on_exception(
76 : [&]() {
77 4 : return state->coordinator.in(std::get<Index>(state->source));
78 0 : },
79 4 : state->out);
80 4 : if (source.empty()) {
81 0 : return;
82 : }
83 :
84 : // this subscribe does not share the observer subscription
85 : // so that when it is unsubscribed the observer can be called
86 : // until the inner subscriptions have finished
87 4 : auto sink = rxcpp::make_subscriber<source_value_type>(
88 4 : state->out,
89 : innercs,
90 : // on_next
91 : [state](source_value_type st) {
92 33 : auto &value = std::get<Index>(state->latest);
93 :
94 33 : if (value.empty()) {
95 3 : ++state->valuesSet;
96 3 : }
97 :
98 33 : value.reset(st);
99 :
100 33 : if (state->valuesSet == sizeof...(ObservableN)) {
101 30 : auto values = rxcpp::util::surely(state->latest);
102 30 : auto selectedResult = rxcpp::util::apply(values, state->selector);
103 30 : state->out.on_next(selectedResult);
104 30 : }
105 33 : },
106 : // on_error
107 : [state](std::exception_ptr e) { state->out.on_error(e); },
108 : // on_completed
109 : [state]() { state->out.on_completed(); });
110 4 : auto selectedSink = on_exception(
111 : [&]() { return state->coordinator.out(sink); }, state->out);
112 4 : if (selectedSink.empty()) {
113 0 : return;
114 : }
115 4 : source->subscribe(std::move(selectedSink.get()));
116 4 : }
117 :
118 : template <class State, int... IndexN>
119 : void subscribe_all(std::shared_ptr<State> state,
120 : rxcpp::util::values<int, IndexN...>) const {
121 4 : bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
122 4 : subscribed[0] = (*subscribed); // silence warning
123 4 : }
124 :
125 : template <class Subscriber>
126 : void on_subscribe(Subscriber scbr) const {
127 : static_assert(rxcpp::is_subscriber<Subscriber>::value,
128 : "subscribe must be passed a subscriber");
129 :
130 : typedef Subscriber output_type;
131 :
132 : struct combine_latest_until_first_completed_state_type
133 : : public std::enable_shared_from_this<
134 : combine_latest_until_first_completed_state_type>,
135 : public values {
136 : combine_latest_until_first_completed_state_type(values i,
137 : coordinator_type coor,
138 : output_type oarg)
139 4 : : values(std::move(i)),
140 4 : valuesSet(0),
141 4 : coordinator(std::move(coor)),
142 4 : out(std::move(oarg)) {}
143 :
144 : mutable int valuesSet;
145 : mutable tuple_source_value_type latest;
146 : coordinator_type coordinator;
147 : output_type out;
148 : };
149 :
150 : auto coordinator =
151 4 : initial.coordination.create_coordinator(scbr.get_subscription());
152 :
153 : // take a copy of the values for each subscription
154 : auto state =
155 4 : std::make_shared<combine_latest_until_first_completed_state_type>(
156 4 : initial, std::move(coordinator), std::move(scbr));
157 :
158 4 : subscribe_all(
159 4 : state,
160 : typename rxcpp::util::values_from<int,
161 : sizeof...(ObservableN)>::type());
162 4 : }
163 : };
164 :
165 : template <
166 : class Coordination,
167 : class Selector,
168 : class Observable,
169 : class... ObservableN,
170 : class Enabled = rxcpp::util::enable_if_all_true_type_t<
171 : rxcpp::is_coordination<Coordination>,
172 : rxcpp::operators::detail::
173 : is_combine_latest_selector<Selector, Observable, ObservableN...>,
174 : rxcpp::all_observables<Observable, ObservableN...>>,
175 : class ResolvedSelector = rxcpp::util::decay_t<Selector>,
176 : class combine_latest = combine_latest_until_first_completed<
177 : Coordination,
178 : ResolvedSelector,
179 : rxcpp::util::decay_t<Observable>,
180 : rxcpp::util::decay_t<ObservableN>...>,
181 : class Value = rxcpp::util::value_type_t<combine_latest>,
182 : class Result = rxcpp::observable<Value, combine_latest>>
183 : static Result makeCombineLatestUntilFirstCompleted(Observable &&o,
184 : Coordination &&cn,
185 : Selector &&s,
186 : ObservableN &&... on) {
187 4 : return Result(
188 4 : combine_latest(std::forward<Coordination>(cn),
189 4 : std::forward<Selector>(s),
190 4 : std::make_tuple(std::forward<Observable>(o),
191 4 : std::forward<ObservableN>(on)...)));
192 0 : }
193 :
194 : } // namespace iroha
195 :
196 : #endif // IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP
|