c# - Is there something like ThrottleOrMax in rx? -


use case: i'm writing thing monitors changes , saves automatically. want throttle don't save more every 5 seconds. want save every 30 seconds if there continuous stream of changes.

could not find observable.throttle(mergetime, maxtime) in docs , think of ugly ways of writing own hence question.

here's way using groupbyuntil:

public static iobservable<t> throttlewithmax_groupby<t>(this iobservable<t> source, timespan throttle, timespan maxtime, ischeduler scheduler = null) {     return source         .groupbyuntil(             t => 0, // same key             t => t, // element element             g =>             {                 // expire group when slows down throttle                 // or when exceeds maxtime                 return g                     .throttle(throttle, scheduler ?? scheduler.default)                     .timeout(maxtime, observable.empty<t>(), scheduler ?? scheduler.default);             })         .selectmany(g => g.lastasync()); } 

and here's way using window:

public static iobservable<t> throttlewithmax_window<t>(this iobservable<t> source, timespan throttle, timespan maxtime, ischeduler scheduler = null) {     return source.publish(p => p             .window(() =>             {                 // close window when p slows down throttle                 // or when exceeds maxtime.                 // not start throttling or maxtime timer                 // until first p of new window arrives                 var throttletimer = p.throttle(throttle, scheduler ?? scheduler.default);                 var timeouttimer = p.delay(maxtime, scheduler ?? scheduler.default);                 // signal when either timer signals                 return throttletimer.amb(timeouttimer);             })             .selectmany(w => w.takelast(1))); } 

here interactive marble diagram (drag input marbles around):

examples["throttlewithmax"] = {    category: "custom",    label: "throttlewithmax(5, 10)",    inputs: [      [1, 4, 8, 12, 20, 24, 28, 50].map(function(i) {        return {          d: i,          t:        };      }).concat([55])    ],    apply: function(inputs, scheduler, rx) {      rx.observable.prototype.throttlewithmax = function(throttle, maxtime, scheduler) {        var s = scheduler || rx.scheduler.timeout;        return          .publish(function(p) {            return p              .window(function() {                var throttletimer = p.debounce(throttle, s);                var timeouttimer = p.delay(maxtime, s);                return rx.observable.amb(throttletimer, timeouttimer);              })              .flatmap(function(w) {                return w.takelast(1);              });          });      };        return inputs[0].throttlewithmax(5, 10, scheduler);    }  };    var d = document.createelement("div");  document.body.appendchild(d);  d.innerhtml = '<rx-marbles key="throttlewithmax"></rx-marbles>';
<script src="http://bman654.github.io/samples/rxmarbles-old/element.js"></script>  <!--[if lt ie 7]>    <p class="browsehappy">you using <strong>outdated</strong> browser. please <a href="http://browsehappy.com/">upgrade browser</a> improve experience.</p>  <![endif]-->

and here unit test uses testscheduler control clock , take randomness of system clock out of it:

private const int _throttle = 50; private const int _timeout = 100; private const int _complete = 100000; [testcase("groupby", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _throttle }, testname = "g1")] [testcase("groupby", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _timeout }, testname = "g2")] [testcase("groupby", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _throttle, 1000 + _timeout, 1110 + _throttle }, testname = "g3")] [testcase("window", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _throttle }, testname = "w1")] [testcase("window", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _timeout }, testname = "w2")] [testcase("window", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _throttle, 1000 + _timeout, 1110 + _throttle }, testname = "w3")] public void throttle(string which, int[] pattern, int[] expectedpattern, int[] expectedtimes) {     var scheduler = new testscheduler();     var completeevent = new[] { reactivetest.oncompleted(_complete, _complete) };     var source = scheduler.createcoldobservable(pattern.select(v => reactivetest.onnext(v, v)).concat(completeevent).toarray());     var throttled = source.throttlewithmax(which, timespan.fromticks(_throttle), timespan.fromticks(_timeout), scheduler);     var observer = scheduler.createobserver<int>();     throttled.subscribe(observer);      // start clock     scheduler.start();      // check results     var expected = expectedpattern.zip(expectedtimes, (v, t) => reactivetest.onnext(t, v)).concat(completeevent).tolist();     collectionassert.areequal(expected, observer.messages); } 

here's complete unit test code.


Comments

Popular posts from this blog

javascript - gulp-nodemon - nodejs restart after file change - Error: listen EADDRINUSE events.js:85 -

Fatal Python error: Py_Initialize: unable to load the file system codec. ImportError: No module named 'encodings' -

oracle - Changing start date for system jobs related to automatic statistics collections in 11g -