如何组成Observable以避免给定的嵌套和依赖回调? [英] How to compose Observables to avoid the given nested and dependent callbacks?
问题描述
在这个博客,他给了这个(复制/粘贴以下代码)回调地狱的例子。但是,没有提到如何使用Reactive Extensions消除该问题。
In this blog, he gives this (copy/pasted the following code) example for the callback hell. However, there is no mention of how the issue can be eliminated by using Reactive Extensions.
所以这里F3取决于F1完成,F4和F5取决于F2完成。
So here F3 depends upon F1 completion and F4 and F5 depend upon F2 completion.
- 想知道Rx中的功能等价物是什么。
- 如何在Rx中表示F1,F2,F3,F4和F5都应该异步拉动?
注意:我目前正试图绕过Rx,所以在问这个问题之前我没有尝试解决这个例子。
NOTE: I am currently trying to wrap my head around Rx so I didn't try solving this example before asking this question.
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class CallbackB {
/**
* Demonstration of nested callbacks which then need to composes their responses together.
* <p>
* Various different approaches for composition can be done but eventually they end up relying upon
* synchronization techniques such as the CountDownLatch used here or converge on callback design
* changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
*/
public static void run() throws Exception {
final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
/* the following are used to synchronize and compose the asynchronous callbacks */
final CountDownLatch latch = new CountDownLatch(3);
final AtomicReference<String> f3Value = new AtomicReference<String>();
final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();
try {
// get f3 with dependent result from f1
executor.execute(new CallToRemoteServiceA(new Callback<String>() {
@Override
public void call(String f1) {
executor.execute(new CallToRemoteServiceC(new Callback<String>() {
@Override
public void call(String f3) {
// we have f1 and f3 now need to compose with others
System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
// set to thread-safe variable accessible by external scope
f3Value.set(f3);
latch.countDown();
}
}, f1));
}
}));
// get f4/f5 after dependency f2 completes
executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {
@Override
public void call(Integer f2) {
executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {
@Override
public void call(Integer f4) {
// we have f2 and f4 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
// set to thread-safe variable accessible by external scope
f4Value.set(f4);
latch.countDown();
}
}, f2));
executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {
@Override
public void call(Integer f5) {
// we have f2 and f5 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
// set to thread-safe variable accessible by external scope
f5Value.set(f5);
latch.countDown();
}
}, f2));
}
}));
/* we must wait for all callbacks to complete */
latch.await();
System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
} finally {
executor.shutdownNow();
}
}
public static void main(String[] args) {
try {
run();
} catch (Exception e) {
e.printStackTrace();
}
}
private static final class CallToRemoteServiceA implements Runnable {
private final Callback<String> callback;
private CallToRemoteServiceA(Callback<String> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseA");
}
}
private static final class CallToRemoteServiceB implements Runnable {
private final Callback<Integer> callback;
private CallToRemoteServiceB(Callback<Integer> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(100);
}
}
private static final class CallToRemoteServiceC implements Runnable {
private final Callback<String> callback;
private final String dependencyFromA;
private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
this.callback = callback;
this.dependencyFromA = dependencyFromA;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseB_" + dependencyFromA);
}
}
private static final class CallToRemoteServiceD implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(140);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(40 + dependencyFromB);
}
}
private static final class CallToRemoteServiceE implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(55);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(5000 + dependencyFromB);
}
}
private static interface Callback<T> {
public void call(T value);
}
}
推荐答案
我是关于回调和Java期货的引用博客文章的原作者。以下是使用flatMap,zip和merge异步进行服务组合的示例。
I'm the original author of the referenced blog post about callbacks and Java Futures. Here is an example of using flatMap, zip and merge to do service composition asynchronously.
它获取一个User对象,然后同时提取Social和PersonalizedCatalog数据,然后对于来自PersonalizedCatalog的每个视频同时提取书签,评级和元数据,将它们拉到一起,并且将所有响应合并为渐进式流输出作为服务器发送事件。
It fetches a User object, then concurrently fetches Social and PersonalizedCatalog data, then for each Video from the PersonalizedCatalog concurrently fetches a Bookmark, Rating and Metadata, zips those together, and merges all of the responses into a progressive stream output as Server-Sent Events.
return getUser(userId).flatMap(user -> {
Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
.flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
video -> {
Observable<Bookmark> bookmark = getBookmark(video);
Observable<Rating> rating = getRatings(video);
Observable<VideoMetadata> metadata = getVideoMetadata(video);
return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
}));
Observable<Map<String, Object>> social = getSocial(user).map(s -> {
return s.getDataAsMap();
});
return Observable.merge(catalog, social);
}).flatMap(data -> {
String json = SimpleJson.mapToJson(data);
return response.writeStringAndFlush("data: " + json + "\n");
});
由于我无法在此提供所有信息,您还可以在 https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide = 32 。
Since I can't possibly provide all of the information here you can also find an explanation in presentation form (with link to video) at https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32.
这篇关于如何组成Observable以避免给定的嵌套和依赖回调?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!