c# - Is there something like ThrottleOrMax in rx? -
use case: i'm writing thing monitors changes , saves automatically. want throttle don't save more every 5 seconds. want save every 30 seconds if there continuous stream of changes.
could not find observable.throttle(mergetime, maxtime) in docs , think of ugly ways of writing own hence question.
here's way using groupbyuntil
:
public static iobservable<t> throttlewithmax_groupby<t>(this iobservable<t> source, timespan throttle, timespan maxtime, ischeduler scheduler = null) { return source .groupbyuntil( t => 0, // same key t => t, // element element g => { // expire group when slows down throttle // or when exceeds maxtime return g .throttle(throttle, scheduler ?? scheduler.default) .timeout(maxtime, observable.empty<t>(), scheduler ?? scheduler.default); }) .selectmany(g => g.lastasync()); }
and here's way using window
:
public static iobservable<t> throttlewithmax_window<t>(this iobservable<t> source, timespan throttle, timespan maxtime, ischeduler scheduler = null) { return source.publish(p => p .window(() => { // close window when p slows down throttle // or when exceeds maxtime. // not start throttling or maxtime timer // until first p of new window arrives var throttletimer = p.throttle(throttle, scheduler ?? scheduler.default); var timeouttimer = p.delay(maxtime, scheduler ?? scheduler.default); // signal when either timer signals return throttletimer.amb(timeouttimer); }) .selectmany(w => w.takelast(1))); }
here interactive marble diagram (drag input marbles around):
examples["throttlewithmax"] = { category: "custom", label: "throttlewithmax(5, 10)", inputs: [ [1, 4, 8, 12, 20, 24, 28, 50].map(function(i) { return { d: i, t: }; }).concat([55]) ], apply: function(inputs, scheduler, rx) { rx.observable.prototype.throttlewithmax = function(throttle, maxtime, scheduler) { var s = scheduler || rx.scheduler.timeout; return .publish(function(p) { return p .window(function() { var throttletimer = p.debounce(throttle, s); var timeouttimer = p.delay(maxtime, s); return rx.observable.amb(throttletimer, timeouttimer); }) .flatmap(function(w) { return w.takelast(1); }); }); }; return inputs[0].throttlewithmax(5, 10, scheduler); } }; var d = document.createelement("div"); document.body.appendchild(d); d.innerhtml = '<rx-marbles key="throttlewithmax"></rx-marbles>';
<script src="http://bman654.github.io/samples/rxmarbles-old/element.js"></script> <!--[if lt ie 7]> <p class="browsehappy">you using <strong>outdated</strong> browser. please <a href="http://browsehappy.com/">upgrade browser</a> improve experience.</p> <![endif]-->
and here unit test uses testscheduler
control clock , take randomness of system clock out of it:
private const int _throttle = 50; private const int _timeout = 100; private const int _complete = 100000; [testcase("groupby", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _throttle }, testname = "g1")] [testcase("groupby", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _timeout }, testname = "g2")] [testcase("groupby", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _throttle, 1000 + _timeout, 1110 + _throttle }, testname = "g3")] [testcase("window", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _throttle }, testname = "w1")] [testcase("window", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _timeout }, testname = "w2")] [testcase("window", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _throttle, 1000 + _timeout, 1110 + _throttle }, testname = "w3")] public void throttle(string which, int[] pattern, int[] expectedpattern, int[] expectedtimes) { var scheduler = new testscheduler(); var completeevent = new[] { reactivetest.oncompleted(_complete, _complete) }; var source = scheduler.createcoldobservable(pattern.select(v => reactivetest.onnext(v, v)).concat(completeevent).toarray()); var throttled = source.throttlewithmax(which, timespan.fromticks(_throttle), timespan.fromticks(_timeout), scheduler); var observer = scheduler.createobserver<int>(); throttled.subscribe(observer); // start clock scheduler.start(); // check results var expected = expectedpattern.zip(expectedtimes, (v, t) => reactivetest.onnext(t, v)).concat(completeevent).tolist(); collectionassert.areequal(expected, observer.messages); }
here's complete unit test code.
Comments
Post a Comment