c# - Is it possible to send a message not to all observers using .net Rx? -
i have situation there observable , let's 10 observers attached it. send message each new observer until observer somehow says observable recognizes message , process it. @ moment stop sending message other observers.
in other words each observer knows how process particular type of message , each 1 take , process message recognizes. others don't need receive after 1 recognizes started processing. how situation implemented reactive extensions? assume need sort of notification observable don't see how can done.
this came with. comments, ideas, suggestions , critics welcome. interesting part iobserver<tvalue, tresult>
public interface exists in rx library it's used in notification object. did, created iobservable<tvalue, tresult>
counterpart, selectivesubject take care of logic call observers until 1 of them returns true , toselective method extension. i'm surprised not done in library, @ least iobservable<tvalue, tresult>
part. after all, iobserver<tvalue, tresult>
existed.
public interface iobservable<out tvalue, in tresult> { idisposable subscribe(iobserver<tvalue, tresult> objobserver); } internal class selectivesubject<t> : iobserver<t>, iobservable<t, bool> { private readonly linkedlist<iobserver<t, bool>> _observerlist; public selectivesubject() { _observerlist = new linkedlist<iobserver<t, bool>>(); } public void onnext(t value) { lock(_observerlist) { foreach(iobserver<t, bool> objobserver in _observerlist) { if(objobserver.onnext(value)) { break; } } } } public void onerror(exception exception) { lock(_observerlist) { foreach(iobserver<t, bool> objobserver in _observerlist) { if(objobserver.onerror(exception)) { break; } } } } public void oncompleted() { lock(_observerlist) { foreach(iobserver<t, bool> objobserver in _observerlist) { if(objobserver.oncompleted()) { break; } } } } public idisposable subscribe(iobserver<t, bool> objobserver) { linkedlistnode<iobserver<t, bool>> objnode; lock(_observerlist) { objnode = _observerlist.addlast(objobserver); } return disposable.create(() => { lock(objnode.list) { objnode.list.remove(objnode); } }); } } public static iobservable<t, bool> toselective<t>(this iobservable<t> objthis) { var objselective = new selectivesubject<t>(); objthis.subscribe(objselective); return objselective; }
now usage simple this
iconnectableobservable<int> objgenerator = observable.generate(0, => < 100, => + 1, => i).publish(); iobservable<int, bool> objselective = objgenerator.toselective(); var objdisposablelist = new compositedisposable(2) { objselective.subscribe(i => { console.write("1"); if(i % 2 == 0) { console.write("!"); return true; } else { console.write("."); return false; } }), objselective.subscribe(i => { console.write("2"); console.write("!"); return true; }) }; objgenerator.connect(); objdisposablelist.dispose();
in example first subscriber takes care of every other value in sequence , second subscriber takes care of rest.
Comments
Post a Comment