RxJava + JavaFX属性 [英] RxJava + JavaFX Property

查看:169
本文介绍了RxJava + JavaFX属性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个名为 rxToProperty()的方法,它将 Observable 转换为JavaFX 物业

I have a method called rxToProperty() that turns an Observable into a JavaFX Property.

 public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.onBackpressureLatest().serialize().subscribe(v -> { 
            synchronized(property) { 
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }

如何确保返回的属性总是是最新的,特别是当它绑定到JavaFX中的控件时?我注意到一些非常奇怪的随机行为似乎表明线程安全性已经被调度程序和JavaFX所破坏。

How do I ensure that the returned Property is always up-to-date especially when it is bound to controls in JavaFX? I have noticed some very bizarre, random behaviors that seem to indicate thread safety has been compromised with the schedulers and JavaFX.

例如,我尝试使用此方法显示以几种不同方式安排的一个源 Observable ,以及当在 TableView 中使用时,结果是随机的和随意的,单元格在具有值和没有值之间交替,以及有时永远不会结束的线程活动。

Like for example, I try to use this method to show one source Observable scheduled in several different ways, and the results are random and haphazard when used in a TableView, with cells alternating between having values and not having values, as well as thread activity that sometimes will never end.

每当我尝试使用 Platform.invokeLater()安排FX线程时,它只会使行为更加疯狂。我的 rxToProperty()方法有什么问题?

Whenever I try to schedule on the FX thread using Platform.invokeLater() it only makes the behavior more crazy. What is wrong with my rxToProperty() method?

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable());

        stage.setScene(scene);
        stage.sizeToScene();
        stage.show();
    }

    private static final class ReactiveRecord {

        private final Observable<Number> obs = Observable.just(10,20,30,40,50,60).cast(Number.class);

        public Observable<Number> subscribeImmediate() { 
            return obs;
        }
        public Observable<Number> subscribeComputation() { 
            return obs.subscribeOn(Schedulers.computation());
        }
        public Observable<Number> subscribeNewthread() { 
            return obs.subscribeOn(Schedulers.newThread());
        }
        public Observable<Number> subscribeIo() { 
            return obs.subscribeOn(Schedulers.io());
        }
        public Observable<Number> subscribeParallel() {
            return obs.flatMap(i -> Observable.just(i).subscribeOn(Schedulers.computation()));
        }
        public Observable<Number> subscribeTrampoline() { 
            return obs.subscribeOn(Schedulers.trampoline());
        }

        public ImmutableList<Observable<Number>> getAll() { 
            return ImmutableList.of(subscribeImmediate(),subscribeComputation(),subscribeNewthread(),subscribeIo(),subscribeParallel(),subscribeTrampoline());
        }
        public ImmutableList<String> getHeaders() {
            return ImmutableList.of("IMMEDIATE","COMPUTATION","NEW","IO","PARALLEL","TRAMPOLINE");
        }
    }

    private static final class ReactiveTable extends TableView<ReactiveRecord> {


        private ReactiveTable() { 
            ReactiveRecord record = new ReactiveRecord();

            this.getItems().add(record);

            ImmutableList<Observable<Number>> observables = record.getAll();
            ImmutableList<String> headers = record.getHeaders();

            for (int i = 0; i < observables.size(); i++) { 
                 TableColumn<ReactiveRecord,Number> col = new TableColumn<>(headers.get(i));
                 final int index = i;
                 col.setCellValueFactory(cb -> rxToProperty(observables.get(index)));
                 this.getColumns().add(col);
            }
        }
    }
    public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.onBackpressureLatest().serialize().subscribe(v -> { 
            synchronized(property) { 
                System.out.println("Emitting val " + v + " on " + Thread.currentThread().getName());
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }
    public static void main(String[] args) { 
        launch(args);
    }
}


推荐答案

Tomas Mikula对ReactFX GitHub项目中的这种行为以及解决方案提供了一些非常有用的见解。

Tomas Mikula gave some very helpful insight to this behavior, as well as the solution, on the ReactFX GitHub project.

https://github.com/ TomasMikula / ReactFX / issues / 22

这篇关于RxJava + JavaFX属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆