使用RxJS和twitter-stream-api模块订阅流 [英] Subscribe to a stream with RxJS and twitter-stream-api module

查看:71
本文介绍了使用RxJS和twitter-stream-api模块订阅流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

好吧,所以我是Rx的一个完整的初学者,不幸的是,对于JS和JS中的流来说,它还是很新的.我正在使用此 https://github.com/trygve-lie/twitter-stream-api连接到Twitters流式api并通过tweet接收json对象.到目前为止,我已经有了这段代码

Ok, so I'm a complete beginner with Rx and unfortunately very new to js and streams in js as well. Im using this https://github.com/trygve-lie/twitter-stream-api to connect to twitters streaming api and receive json objects with tweets. So far I have this code

var Rx = require('rxjs/Rx');

var TwitterStream = require('twitter-stream-api'),
    fs = require('fs');
var filter = 'tweet';
var keys = {
    consumer_key : "key",
    consumer_secret : "secret",
    token : "token",
    token_secret : "tokensecret"
};

var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
    track: filter
});

Twitter.on('connection success', function (uri) {
    console.log('connection success', uri); 
});
Twitter.on('data', function (obj) {
    console.log(obj.text);
});

我已经成功地向控制台编写了推文,但我真正想学习的是使用流,尤其是RxJS.我尝试了所有可以想到的方法来创建可观察的对象.Rx可观察.创建/从等...

I am successfully writing tweets to the console but what I am really trying to learn is working with streams and in particular RxJS. I have tried all the ways I can think of to create an observable. Rx.Observable. create/from etc...

我还尝试了Twitter.resume(),因为默认情况下它显然已暂停以恢复流并观察到它.我只会收到诸如cannot之类的错误.subscribe不是函数.通过上面的内容,我该如何使用Rx.Observable开始过滤并处理数据?

I have also tried Twitter.resume() as it is apparently paused by default to resume the stream and observe that. I only get errors such as cannot .subscribe is not a function. From what I have above, how can I use Rx.Observable to start filtering and playing around with the data?

谢谢!

推荐答案

RxJS 5没有任何方法将流从Observable转换为Observable,因此您需要自己完成此操作.理想情况下,使用 Observable.create .

RxJS 5 doesn't have any methods to convert stream from/to an Observable so you'll need to do this by yourself. Ideally with Observable.create.

const Rx = require('rxjs');
const Observable = Rx.Observable;

var TwitterStream = require('twitter-stream-api'),

...

var source$ = Observable.create(observer => {
  var Twitter = new TwitterStream(keys);
  Twitter.stream('statuses/filter', {
    track: filter
  });

  Twitter.on('data', function (obj) {
    observer.next(obj);
  });

  return () => {
    Twitter.close();
  };
});

这将使Observable变得冷酷,仅当您订阅Twitter时,它才会连接到Twitter. Observable.create 静态方法使您可以将值推入观察器,最后返回拆解函数,然后仅关闭连接.取消订阅或Observable完成时将调用此函数.

This makes a cold Observable that'll connect to Twitter only when you subscribe to it. The Observable.create static methods let's you push values to the observer and at the end return a tear down function then just closes the connection. This function is called when you unsubscribe or when the Observable completes.

然后,您可以将此Observable与所需的对象链接起来

Then you can chain this Observable with whatever you want:

source$.filter(...).map(...)

请注意,还有方法 Observable.bindCallback() Observable.bindNodeCallback(),但这些方法在您遇到的情况中对您没有多大帮助.

Note, that there're also methods Observable.bindCallback() and Observable.bindNodeCallback() but these wouldn't help you much in your situation.

了解更多:

https://medium.com/@ benlesh/hot-vs-cold-observables-f8094ed53339#.scxgvqp49

https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87#.2099xwi13

这篇关于使用RxJS和twitter-stream-api模块订阅流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆