使用 RxJS 和 twitter-stream-api 模块订阅流 [英] Subscribe to a stream with RxJS and twitter-stream-api module
问题描述
好的,所以我是 Rx 的完全初学者,不幸的是,我对 js 和 js 中的流也很陌生.我使用这个 https://github.com/trygve-lie/twitter-stream-api 连接到 twitters 流 api 并接收带有推文的 json 对象.到目前为止我有这个代码
var Rx = require('rxjs/Rx');var TwitterStream = require('twitter-stream-api'),fs = require('fs');var filter = 'tweet';变量键 = {消费者密钥:密钥",消费者秘密:秘密",令牌:令牌",token_secret :令牌秘密"};var Twitter = new TwitterStream(keys);Twitter.stream('状态/过滤器', {轨道:过滤器});Twitter.on('连接成功', 函数 (uri) {console.log('连接成功', uri);});Twitter.on('数据', 函数 (obj) {控制台日志(obj.text);});
我成功地将推文写入控制台,但我真正想学习的是使用流,尤其是 RxJS.我已经尝试了所有我能想到的方法来创建一个 observable.Rx.Observable.创建/从等...
我也尝试过 Twitter.resume() 因为它显然默认暂停以恢复流并观察它.我只收到诸如不能 .subscribe is not a function 之类的错误.根据我上面的内容,我如何使用 Rx.Observable 开始过滤和处理数据?
谢谢!
RxJS 5 没有任何方法可以将流从 Observable 转换为 Observable,因此您需要自己完成.最好使用 Observable.create
.
const Rx = require('rxjs');const Observable = Rx.Observable;var TwitterStream = require('twitter-stream-api'),...var source$ = Observable.create(observer => {var Twitter = new TwitterStream(keys);Twitter.stream('状态/过滤器', {轨道:过滤器});Twitter.on('数据', 函数 (obj) {观察者.下一个(对象);});返回 () =>{推特.close();};});
这使得一个冷的 Observable 只有在你订阅它时才会连接到 Twitter.Observable.create
静态方法让你将值推送给观察者,最后返回一个拆卸函数,然后关闭连接.当您取消订阅或 Observable 完成时调用此函数.
然后你可以将这个 Observable 与任何你想要的链接起来:
source$.filter(...).map(...)
请注意,还有方法 Observable.bindCallback()
和 Observable.bindNodeCallback()
但这些方法在您的情况下对您没有多大帮助.>
阅读更多:
http://reactx.io/rxjs/class/es6/Observable.js~Observable.html#static-method-create
https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339#.scxgvqp49
https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87#.2099xwi13
Ok, so I'm a complete beginner with Rx and unfortunately very new to js and streams in js as well. Im using this https://github.com/trygve-lie/twitter-stream-api to connect to twitters streaming api and receive json objects with tweets. So far I have this code
var Rx = require('rxjs/Rx');
var TwitterStream = require('twitter-stream-api'),
fs = require('fs');
var filter = 'tweet';
var keys = {
consumer_key : "key",
consumer_secret : "secret",
token : "token",
token_secret : "tokensecret"
};
var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
track: filter
});
Twitter.on('connection success', function (uri) {
console.log('connection success', uri);
});
Twitter.on('data', function (obj) {
console.log(obj.text);
});
I am successfully writing tweets to the console but what I am really trying to learn is working with streams and in particular RxJS. I have tried all the ways I can think of to create an observable. Rx.Observable. create/from etc...
I have also tried Twitter.resume() as it is apparently paused by default to resume the stream and observe that. I only get errors such as cannot .subscribe is not a function. From what I have above, how can I use Rx.Observable to start filtering and playing around with the data?
Thanks!
RxJS 5 doesn't have any methods to convert stream from/to an Observable so you'll need to do this by yourself. Ideally with Observable.create
.
const Rx = require('rxjs');
const Observable = Rx.Observable;
var TwitterStream = require('twitter-stream-api'),
...
var source$ = Observable.create(observer => {
var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
track: filter
});
Twitter.on('data', function (obj) {
observer.next(obj);
});
return () => {
Twitter.close();
};
});
This makes a cold Observable that'll connect to Twitter only when you subscribe to it. The Observable.create
static methods let's you push values to the observer and at the end return a tear down function then just closes the connection. This function is called when you unsubscribe or when the Observable completes.
Then you can chain this Observable with whatever you want:
source$.filter(...).map(...)
Note, that there're also methods Observable.bindCallback()
and Observable.bindNodeCallback()
but these wouldn't help you much in your situation.
Read more:
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-create
https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339#.scxgvqp49
https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87#.2099xwi13
这篇关于使用 RxJS 和 twitter-stream-api 模块订阅流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!