RxJava + JavaFX属性 [英] RxJava + JavaFX Property
问题描述
我有一个名为 rxToProperty()
的方法,它将 Observable
转换为JavaFX 物业
。
I have a method called rxToProperty()
that turns an Observable
into a JavaFX Property
.
public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) {
ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();
obs.onBackpressureLatest().serialize().subscribe(v -> {
synchronized(property) {
property.set(v);
}
});
return property.getReadOnlyProperty();
}
如何确保返回的属性
总是是最新的,特别是当它绑定到JavaFX中的控件时?我注意到一些非常奇怪的随机行为似乎表明线程安全性已经被调度程序和JavaFX所破坏。
How do I ensure that the returned Property
is always up-to-date especially when it is bound to controls in JavaFX? I have noticed some very bizarre, random behaviors that seem to indicate thread safety has been compromised with the schedulers and JavaFX.
例如,我尝试使用此方法显示以几种不同方式安排的一个源 Observable
,以及当在 TableView
中使用时,结果是随机的和随意的,单元格在具有值和没有值之间交替,以及有时永远不会结束的线程活动。
Like for example, I try to use this method to show one source Observable
scheduled in several different ways, and the results are random and haphazard when used in a TableView
, with cells alternating between having values and not having values, as well as thread activity that sometimes will never end.
每当我尝试使用 Platform.invokeLater()
安排FX线程时,它只会使行为更加疯狂。我的 rxToProperty()
方法有什么问题?
Whenever I try to schedule on the FX thread using Platform.invokeLater()
it only makes the behavior more crazy. What is wrong with my rxToProperty()
method?
public class ReactiveTableViewTest extends Application {
@Override
public void start(Stage stage) throws Exception {
Group root = new Group();
Scene scene = new Scene(root);
root.getChildren().add(new ReactiveTable());
stage.setScene(scene);
stage.sizeToScene();
stage.show();
}
private static final class ReactiveRecord {
private final Observable<Number> obs = Observable.just(10,20,30,40,50,60).cast(Number.class);
public Observable<Number> subscribeImmediate() {
return obs;
}
public Observable<Number> subscribeComputation() {
return obs.subscribeOn(Schedulers.computation());
}
public Observable<Number> subscribeNewthread() {
return obs.subscribeOn(Schedulers.newThread());
}
public Observable<Number> subscribeIo() {
return obs.subscribeOn(Schedulers.io());
}
public Observable<Number> subscribeParallel() {
return obs.flatMap(i -> Observable.just(i).subscribeOn(Schedulers.computation()));
}
public Observable<Number> subscribeTrampoline() {
return obs.subscribeOn(Schedulers.trampoline());
}
public ImmutableList<Observable<Number>> getAll() {
return ImmutableList.of(subscribeImmediate(),subscribeComputation(),subscribeNewthread(),subscribeIo(),subscribeParallel(),subscribeTrampoline());
}
public ImmutableList<String> getHeaders() {
return ImmutableList.of("IMMEDIATE","COMPUTATION","NEW","IO","PARALLEL","TRAMPOLINE");
}
}
private static final class ReactiveTable extends TableView<ReactiveRecord> {
private ReactiveTable() {
ReactiveRecord record = new ReactiveRecord();
this.getItems().add(record);
ImmutableList<Observable<Number>> observables = record.getAll();
ImmutableList<String> headers = record.getHeaders();
for (int i = 0; i < observables.size(); i++) {
TableColumn<ReactiveRecord,Number> col = new TableColumn<>(headers.get(i));
final int index = i;
col.setCellValueFactory(cb -> rxToProperty(observables.get(index)));
this.getColumns().add(col);
}
}
}
public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) {
ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();
obs.onBackpressureLatest().serialize().subscribe(v -> {
synchronized(property) {
System.out.println("Emitting val " + v + " on " + Thread.currentThread().getName());
property.set(v);
}
});
return property.getReadOnlyProperty();
}
public static void main(String[] args) {
launch(args);
}
}
推荐答案
Tomas Mikula对ReactFX GitHub项目中的这种行为以及解决方案提供了一些非常有用的见解。
Tomas Mikula gave some very helpful insight to this behavior, as well as the solution, on the ReactFX GitHub project.
https://github.com/ TomasMikula / ReactFX / issues / 22
这篇关于RxJava + JavaFX属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!