在 rxjs 中实现 fromSubscriber [英] Implementing fromSubscriber in rxjs

查看:64
本文介绍了在 rxjs 中实现 fromSubscriber的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我今天遇到了一个有趣的问题.我正在开发一个可以上传文件的应用程序,我们想要实现一个进度条.该应用程序是使用 React/Redux/Redux-Observable 编写的.我想为上传进度调度操作.这是我为实现它所做的:

I ran into an interesting issue today. I'm working on an app where we have file uploads, and we want to implement a progress bar. The app is written using React/Redux/Redux-Observable. I want to dispatch actions for upload progress. Here's what I did to implement it:

withProgress(method, url, body = {}, headers = {}) {
    const progressSubscriber = Subscriber.create();

    return {
        Subscriber: progressSubscriber,
        Request:    this.ajax({ url, method, body, headers, progressSubscriber }),
    };
}

我有一个类,用于发出所有 ajax 请求.this.ajax 使用传入的参数调用 Observable.ajax.

I have a class that I use to make all my ajax requests. this.ajax calls Observable.ajax with the passed in parameters.

export const blobStorageUploadEpic = (action$) => {
    return action$.ofType(a.BLOB_STORAGE_UPLOAD)
    .mergeMap(({ payload }) => {
        const { url, valetKey, blobId, blobData, contentType } = payload;

        const { Subscriber, Request } = RxAjax.withProgress('PUT', `${url}?${valetKey}`, blobData, {
            'x-ms-blob-type': 'BlockBlob',
            'Content-Type':   contentType,
        });

        const requestObservable = Request
        .map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } }))
        .catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err }));

        return Observable.fromSubscriber(Subscriber)
        .map((e) => ({ percentage: (e.loaded / e.total) * 100 }))
        .map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} }))
        .merge(requestObservable);
    });
};

这是我的史诗.我让订阅者回来,我写了一个 Observable 的自定义静态方法来接收订阅者.然后我将它与 Request(它是一个 Observable)合并.

This is my epic. I get the subscriber back and I wrote a custom static method of Observable to take in a subscriber. I then merge that with the Request (which is an Observable).

Observable.fromSubscriber = function fromSubscriber(externalSubscriber) {
    return Observable.create((subscriber) => {
        externalSubscriber.next =     (val) => subscriber.next(val);
        externalSubscriber.error =    (err) => subscriber.error(err);
        externalSubscriber.complete = () => subscriber.complete();
    });
};

最后,这里是我在 Observable 上写的自定义静态方法.我写这个有两个原因.1. 作为其他人处理类似问题的例子(我花了很多时间试图弄清楚如何从 Subscriber 制作一个 Observable,然后再写我自己的) 和 2. 询问这是否是实现此目标的最佳方式.rxjs 很深,我认为有一种现有的方法可以做到这一点,但我找不到它.

Finally, here is the custom static method I wrote on Observable. I wrote this for two reasons. 1. As an example for anyone else dealing with a similar problem (I spent a lot of time trying to figure out how to make an Observable from a Subscriber before writing my own) and 2. To ask whether this is the best way to accomplish this goal. rxjs is deep, and I figure that there's an existing way to do this, but I just couldn't find it.

推荐答案

这本质上就是 主题 是为了,以下也应该适用:

That is essentially what a Subject is for, the following should work as well:

export const blobStorageUploadEpic = (action$) => {
    return action$.ofType(a.BLOB_STORAGE_UPLOAD)
    .mergeMap(({ payload }) => {
        const { url, valetKey, blobId, blobData, contentType } = payload;

        const progressSubscriber = new Rx.Subject();
        const request = Rx.Observable.ajax({
            method: 'PUT',
            url: `${url}?${valetKey}`,
            body: blobData,
            headers: {
                'x-ms-blob-type': 'BlockBlob',
                'Content-Type':   contentType,
            },
            progressSubscriber
        });

        const requestObservable = request
            .map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } }))
            .catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err }));

        return progressSubscriber
            .map((e) => ({ percentage: (e.loaded / e.total) * 100 }))
            .map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} }))
            .merge(requestObservable);
    });
};

<小时>

这是一个更通用的例子(现场@jsfiddle):

let data = "";
for (let c = 0; c < 100000; ++c) {
    data += "" + Math.random();
}

const progressSubscriber = new Rx.Subject();
const request = Rx.Observable.ajax({
  method: 'POST',
  url: "/echo/json/",
  body: JSON.stringify({ data }),
  progressSubscriber
});

progressSubscriber
  .merge(request)
  .subscribe(console.log);

这篇关于在 rxjs 中实现 fromSubscriber的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆