rxjs - 如何在捕获和处理错误并发出某些内容后重试 [英] rxjs - How to retry after catching and processing an error with emitting something
问题描述
我使用的是 rxjs v5.4.3
,redux-observable v0.16.0
.
I'm using rxjs v5.4.3
, redux-observable v0.16.0
.
在我的应用程序中,我想实现以下目标:
in my application, I'd like to achieve below:
- 用户拥有身份验证令牌,并刷新令牌以重新生成身份验证令牌.
- 用户使用身份验证令牌请求.(通过发出请求操作)
- 如果失败,请使用刷新令牌请求重新生成身份验证令牌.
- 如果刷新,则发出 TOKEN_REFRESHED 操作以更新身份验证令牌,并且不发出 REQUEST_FAILURE.
- 如果刷新失败,发出 REQUEST_FAILURE
- 如果请求成功,发出 REQUEST_SUCCESS,如果失败,发出 REQUEST_FAILURE.
我想实现:
const fetchEpic = (action$: ActionsObservable<Action>, store: Store<IRootState>) => action$ .ofAction(actions.fetchPost) .mergeMap(({ payload: { postId } })) => { const { authToken, refreshToken } = store.getState().auth; return api.fetchPost({ postId, authToken }) // this returns Observable<ResponseJSON> .map(res => actions.fetchSuccess({ res })) // if success, just emit success-action with the response .catch(err => { if (isAuthTokenExpiredError(err) { return api.reAuthenticate({ refreshToken }) .map(res => actions.refreshTokenSuccess({ authToken: res.authToken }); .catch(actions.fetchFailure({ err })) // and retry fetchPost after re-authenticate! } return Observable.of(actions.fetchFailure({ err })) }) }
有什么解决办法吗?
推荐答案
有很多方法可以做到,但我建议将重新身份验证拆分为自己的史诗,以便更容易维护/测试/重用.
There are many ways to do it, but I would recommend splitting off the reauthentication into its own epic to make it easier to maintain/test/reuse.
这可能是这样的:
const reAuthenticateEpic = (action$, store) => action$.ofType(actions.reAuthenticate) .switchMap(() => { const { refreshToken } = store.getState().auth; return api.reAuthenticate({ refreshToken }) .map(res => actions.refreshTokenSuccess({ authToken: res.authToken })) .catch(err => Observable.of( actions.refreshTokenFailure({ err }) )); });
我们还想使用
Observable.defer
之类的东西,这样每次重试时,我们都会查找authToken 的最新 版本代码>:
We'll also want to use something like
Observable.defer
so that each time we retry, we look up the latest version of theauthToken
:Observable.defer(() => { const { authToken } = store.getState().auth; return api.fetchPost({ postId, authToken }); })
当我们在
fetchEpic
中捕获错误并检测到isAuthTokenExpiredError
时,我们返回一个 Observable 链:When we catch errors in
fetchEpic
and detectisAuthTokenExpiredError
we return an Observable chain that:- 开始监听单个
refreshTokenSuccess
,表示我们可以重试 - 万一重新认证本身失败,我们用
.takeUntil(action$.ofType(refreshTokenFailure))
监听它,这样我们就不会永远等待——你可能想要处理这种情况不同,你的电话. - mergeMap 到原始源,这是
catch
回调的第二个参数.源"是catch
之前的 Observable 链,由于 Observables 是惰性的,当我们收到refreshTokenSuccess
动作时,它将再次重新订阅该链,实际上是一个重试" - 将上述链与
reAuthenticate
操作的 Observable 合并.这用于启动实际的重新身份验证.
- Starts listening for a single
refreshTokenSuccess
, signalling we can retry - Just in case the reauthing itself fails, we listen for it with
.takeUntil(action$.ofType(refreshTokenFailure))
so that we aren't waiting around forever--you might want to handle this case differently, your call. - mergeMap it to the original source, which is the second argument of the
catch
callback. The "source" is the Observable chain before thecatch
, and since Observables are lazy, when we receive therefreshTokenSuccess
action it it will resubscribe to that chain again, effectively be a "retrying" - Merge the above chain with an Observable of an
reAuthenticate
action. This is used to kick off the actual reauth.
总结:我们从
catch
返回的 Observable 链将首先开始侦听refreshTokenSuccess
,然后它发出reAuthenticate
,然后当(和如果)我们收到refreshTokenSuccess
然后我们将重试"源,我们的api.fetchPost()
链位于catch
之上,我们包装在一个Observable.defer
.如果在我们收到refreshTokenSuccess
之前发出refreshTokenFailure
,我们完全放弃.To summarize: the Observable chain we return from
catch
will first starting listening forrefreshTokenSuccess
, then it emitsreAuthenticate
, then when (and if) we receiverefreshTokenSuccess
we will then "retry" the source, ourapi.fetchPost()
chain above thecatch
that we wrapped in anObservable.defer
. IfrefreshTokenFailure
is emitted before we receive ourrefreshTokenSuccess
, we give up entirely.const fetchEpic = (action$, store) => action$.ofType(actions.fetchPost) .mergeMap(({ payload: { postId } })) => Observable.defer(() => { const { authToken } = store.getState().auth; return api.fetchPost({ postId, authToken }); }) .map(res => actions.fetchSuccess({ res })) .catch((err, source) => { if (isAuthTokenExpiredError(err)) { // Start listening for refreshTokenSuccess, then kick off the reauth return action$.ofType(actions.refreshTokenSuccess) .takeUntil(action$.ofType(refreshTokenFailure)) .take(1) .mergeMapTo(source) // same as .mergeMap(() => source) .merge( Observable.of(action.reAuthenticate()) ); } else { return Observable.of(actions.fetchFailure({ err })); } }); );
这些示例未经测试,因此我可能会遇到一些小问题,但希望您能了解要点.也可能有一种更优雅的方式来做到这一点,但这至少可以让你畅通无阻.(如果可以降低复杂性,欢迎其他人编辑此答案)
These examples are untested, so I may have some minor issues but you hopefully get the gist. There's also probably a more elegant way to do this, but this will at least unblock you. (Others are more than welcome to edit this answer if they can decrease the complexity)
这会造成无限重试的可能性很小,这可能会导致用户浏览器或您的服务器出现严重问题.仅重试一定次数和/或在重试之间设置某种延迟可能是个好主意.在实践中,这可能不值得担心,你会最清楚的.
This creates the slight potential for infinite retries, which can cause nasty issues both in the person's browser or your servers. It might be a good idea to only retry a set number of times, and/or put some sort of delay in between your retries. In practice this might not be worth worrying about, you'll know best.
您(或稍后阅读本文的其他人)可能会想使用
.startWith(action.reAuthenticate())
而不是合并,但请注意startWith
只是concat
的简写,而不是merge
,这意味着它会在我们开始监听成功之前同步发出动作.通常这不是问题,因为 http 请求是异步的,但是之前它曾引起人们的错误.You (or someone else reading this later) may be tempted to use
.startWith(action.reAuthenticate())
instead of the merge, but be mindful that astartWith
is just shorthand for aconcat
, not amerge
, which means it would synchronously emit the action before we have started to listen for a success one. Usually that isn't an issue since http requests are async, but it's caused people bugs before.这篇关于rxjs - 如何在捕获和处理错误并发出某些内容后重试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!