c# - Is it possible to send a message not to all observers using .net Rx? -


i have situation there observable , let's 10 observers attached it. send message each new observer until observer somehow says observable recognizes message , process it. @ moment stop sending message other observers.

in other words each observer knows how process particular type of message , each 1 take , process message recognizes. others don't need receive after 1 recognizes started processing. how situation implemented reactive extensions? assume need sort of notification observable don't see how can done.

this came with. comments, ideas, suggestions , critics welcome. interesting part iobserver<tvalue, tresult> public interface exists in rx library it's used in notification object. did, created iobservable<tvalue, tresult> counterpart, selectivesubject take care of logic call observers until 1 of them returns true , toselective method extension. i'm surprised not done in library, @ least iobservable<tvalue, tresult> part. after all, iobserver<tvalue, tresult> existed.

public interface iobservable<out tvalue, in tresult> {    idisposable subscribe(iobserver<tvalue, tresult> objobserver); }    internal class selectivesubject<t> : iobserver<t>, iobservable<t, bool> {    private readonly linkedlist<iobserver<t, bool>> _observerlist;     public selectivesubject() {       _observerlist = new linkedlist<iobserver<t, bool>>();    }     public void onnext(t value) {       lock(_observerlist) {          foreach(iobserver<t, bool> objobserver in _observerlist) {             if(objobserver.onnext(value)) {                break;             }          }       }    }     public void onerror(exception exception) {       lock(_observerlist) {          foreach(iobserver<t, bool> objobserver in _observerlist) {             if(objobserver.onerror(exception)) {                break;             }          }       }    }     public void oncompleted() {       lock(_observerlist) {          foreach(iobserver<t, bool> objobserver in _observerlist) {             if(objobserver.oncompleted()) {                break;             }          }       }    }     public idisposable subscribe(iobserver<t, bool> objobserver) {       linkedlistnode<iobserver<t, bool>> objnode;       lock(_observerlist) {          objnode = _observerlist.addlast(objobserver);       }       return disposable.create(() => {          lock(objnode.list) {             objnode.list.remove(objnode);          }       });    } }    public static iobservable<t, bool> toselective<t>(this iobservable<t> objthis) {    var objselective = new selectivesubject<t>();    objthis.subscribe(objselective);    return objselective; } 

now usage simple this

 iconnectableobservable<int> objgenerator = observable.generate(0, => < 100, => + 1, => i).publish();  iobservable<int, bool> objselective = objgenerator.toselective();   var objdisposablelist = new compositedisposable(2) {     objselective.subscribe(i => {        console.write("1");        if(i % 2 == 0) {           console.write("!");           return true;        }        else {           console.write(".");           return false;        }     }),     objselective.subscribe(i => {        console.write("2");        console.write("!");        return true;     })  };   objgenerator.connect();  objdisposablelist.dispose(); 

in example first subscriber takes care of every other value in sequence , second subscriber takes care of rest.


Comments

Popular posts from this blog

javascript - gulp-nodemon - nodejs restart after file change - Error: listen EADDRINUSE events.js:85 -

Fatal Python error: Py_Initialize: unable to load the file system codec. ImportError: No module named 'encodings' -

javascript - oscilloscope of speaker input stops rendering after a few seconds -