使用 RxJS 和 twitter-stream-api 模块订阅流 [英] Subscribe to a stream with RxJS and twitter-stream-api module

查看:18
本文介绍了使用 RxJS 和 twitter-stream-api 模块订阅流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

好的,所以我是 Rx 的完全初学者,不幸的是,我对 js 和 js 中的流也很陌生.我使用这个 https://github.com/trygve-lie/twitter-stream-api 连接到 twitters 流 api 并接收带有推文的 json 对象.到目前为止我有这个代码

var Rx = require('rxjs/Rx');var TwitterStream = require('twitter-stream-api'),fs = require('fs');var filter = 'tweet';变量键 = {消费者密钥:密钥",消费者秘密:秘密",令牌:令牌",token_secret :令牌秘密"};var Twitter = new TwitterStream(keys);Twitter.stream('状态/过滤器', {轨道:过滤器});Twitter.on('连接成功', 函数 (uri) {console.log('连接成功', uri);});Twitter.on('数据', 函数 (obj) {控制台日志(obj.text);});

我成功地将推文写入控制台,但我真正想学习的是使用流,尤其是 RxJS.我已经尝试了所有我能想到的方法来创建一个 observable.Rx.Observable.创建/从等...

我也尝试过 Twitter.resume() 因为它显然默认暂停以恢复流并观察它.我只收到诸如不能 .subscribe is not a function 之类的错误.根据我上面的内容,我如何使用 Rx.Observable 开始过滤和处理数据?

谢谢!

解决方案

RxJS 5 没有任何方法可以将流从 Observable 转换为 Observable,因此您需要自己完成.最好使用 Observable.create.

const Rx = require('rxjs');const Observable = Rx.Observable;var TwitterStream = require('twitter-stream-api'),...var source$ = Observable.create(observer => {var Twitter = new TwitterStream(keys);Twitter.stream('状态/过滤器', {轨道:过滤器});Twitter.on('数据', 函数 (obj) {观察者.下一个(对象);});返回 () =>{推特.close();};});

这使得一个冷的 Observable 只有在你订阅它时才会连接到 Twitter.Observable.create 静态方法让你将值推送给观察者,最后返回一个拆卸函数,然后关闭连接.当您取消订阅或 Observable 完成时调用此函数.

然后你可以将这个 Observable 与任何你想要的链接起来:

source$.filter(...).map(...)

请注意,还有方法 Observable.bindCallback()Observable.bindNodeCallback() 但这些方法在您的情况下对您没有多大帮助.>

阅读更多:

Ok, so I'm a complete beginner with Rx and unfortunately very new to js and streams in js as well. Im using this https://github.com/trygve-lie/twitter-stream-api to connect to twitters streaming api and receive json objects with tweets. So far I have this code

var Rx = require('rxjs/Rx');

var TwitterStream = require('twitter-stream-api'),
    fs = require('fs');
var filter = 'tweet';
var keys = {
    consumer_key : "key",
    consumer_secret : "secret",
    token : "token",
    token_secret : "tokensecret"
};

var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
    track: filter
});

Twitter.on('connection success', function (uri) {
    console.log('connection success', uri); 
});
Twitter.on('data', function (obj) {
    console.log(obj.text);
});

I am successfully writing tweets to the console but what I am really trying to learn is working with streams and in particular RxJS. I have tried all the ways I can think of to create an observable. Rx.Observable. create/from etc...

I have also tried Twitter.resume() as it is apparently paused by default to resume the stream and observe that. I only get errors such as cannot .subscribe is not a function. From what I have above, how can I use Rx.Observable to start filtering and playing around with the data?

Thanks!

解决方案

RxJS 5 doesn't have any methods to convert stream from/to an Observable so you'll need to do this by yourself. Ideally with Observable.create.

const Rx = require('rxjs');
const Observable = Rx.Observable;

var TwitterStream = require('twitter-stream-api'),

...

var source$ = Observable.create(observer => {
  var Twitter = new TwitterStream(keys);
  Twitter.stream('statuses/filter', {
    track: filter
  });

  Twitter.on('data', function (obj) {
    observer.next(obj);
  });

  return () => {
    Twitter.close();
  };
});

This makes a cold Observable that'll connect to Twitter only when you subscribe to it. The Observable.create static methods let's you push values to the observer and at the end return a tear down function then just closes the connection. This function is called when you unsubscribe or when the Observable completes.

Then you can chain this Observable with whatever you want:

source$.filter(...).map(...)

Note, that there're also methods Observable.bindCallback() and Observable.bindNodeCallback() but these wouldn't help you much in your situation.

Read more:

这篇关于使用 RxJS 和 twitter-stream-api 模块订阅流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆