问题描述
我目前正在使用 rx-java 2,并且有一个用例,即单个 camel route 订阅者需要使用多个 observable.使用此百家乐凯发k8的解决方案作为参考,我有一个部分可行的百家乐凯发k8的解决方案.rxjava - 随时接受更多 observable 的合并 observable?
i'm currently using rx-java 2 and have a use case where multiple observables need to be consumed by single camel route subscriber. using this solution as a reference, i have a partly working solution. rxjava - merged observable that accepts more observables at any time?
我打算使用 publishprocessor
我目前不知道如何使用 publishprocessor 添加/管理 flowable
i'm planning to use a publishprocessor
i'm currently stuck on how can i add/manage flowable
publishprocessorpublishprocessor = publishprocessor.create(); camelreactivestreamsservice camelreactivestreamsservice = camelreactivestreams.get(camelcontext); subscriber subscriber = camelreactivestreamsservice.streamsubscriber("t-class",t.class); } set > flowableset = collections.newsetfrommap(new concurrenthashmap , boolean>()); public void add(flowable flowableorder){ flowableset.add(flowableorder); } public void subscribe(){ publishprocessor.flatmap(x -> flowableset.foreach(// todo) }) .subscribe(subscriber); }
推荐答案
您可以拥有一个 processor 并订阅多个可观察流.您需要在添加和删除 observable 时通过添加和删除订阅来管理订阅.
you can have a single processor and subscribe to more than one observable stream. you would need to manage the subscriptions by adding and removing them as you add and remove observables.
publishprocessorpublishprocessor = publishprocessor.create(); map , disposable> subscriptions = new concurrenthashmap<>(); void addobservable( flowable flowable ) { subscriptions.computeifabsent( flowable, fkey -> flowable.subscribe( publishprocessor ) ); } void removeobservable( flowable flowable ) { disposable d = subscriptions.remove( flowable ); if ( d != null ) { d.dispose(); } } void close() { for ( disposable d: subscriptions.values() ) { d.dispose(); } }
使用 flowable 作为地图的键,并添加或删除订阅.
use the flowable as the key to the map, and add or remove subscriptions.