使用Apache Flink连接到Twitter Streaming API时的IOExcpetion [英] IOExcpetion while connecting to Twitter Streaming API with Apache Flink
问题描述
我编写了一个小型Scala程序,该程序使用Apache Flink Streaming API读取Twitter推文.
I wrote a small Scala program which uses the Apache Flink Streaming API to read Twitter tweets.
object TwitterWordCount {
private val properties = "/home/twitter-login.properties"
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val twitterStream = env.addSource(new TwitterSource(properties))
val tweets = twitterStream
.flatMap(new JSONParseFlatMap[String, String] {
override def flatMap(in: String, out: Collector[String]): Unit = {
if (getString(in, "user.lang") == "en") {
out.collect(getString(in, "text"))
}
}
})
tweets.print
env.execute("tweets")
}
}
执行时遇到以下问题:
14:35:48,353 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Establishing a connection
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection request: [route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection leased: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 1 of 2; total allocated: 1 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.DefaultClientConnectionOperator - Connecting to stream.twitter.com:80
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Received message SendHeartbeat at akka://flink/user/taskmanager_1 from Actor[akka://flink/deadLetters].
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
14:35:49,487 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Handled message SendHeartbeat in 1 ms from Actor[akka://flink/deadLetters].
14:35:49,487 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) at akka://flink/user/jobmanager from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received hearbeat message from cb51cdb1bd08879df10bd2198b8e043a.
14:35:49,488 DEBUG org.apache.flink.runtime.instance.InstanceManager - Received heartbeat from TaskManager cb51cdb1bd08879df10bd2198b8e043a @ localhost - 8 slots - URL: akka://flink/user/taskmanager_1
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) in 0 ms from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d shut down
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection [id: 4][route: {}->http://stream.twitter.com] can be kept alive for 9223372036854775807 MILLISECONDS
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection released: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:52,359 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient IOException caught when establishing connection to https://stream.twitter.com/1.1/statuses/filter.json?delimited=length
14:35:53,613 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient failed to establish connection properly
14:35:53,613 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Done processing, preparing to close connection
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager is shutting down
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager shut down
程序尝试重新建立连接.因此,这4行日志消息将继续发出.
The program tries to re-establish the connection. So this 4 lines of log message continue being emitted.
The strange thing about this is, when I run the example provided in the Apache Flink project everything works just fine (I pulled the latest version of master from GitHub). I even use the same properties file. If I copy that example class to my own project the problem state above occurs too.
我使用Flink原型创建了自己的项目.我在0.9.1版本和0.10-SNAPSHOT中进行了尝试.依赖项flink-scala
,flink-streaming-scala
,flink-clients
和flink-connector-twitter
在相应版本中使用.
I used the Flink archetype to create my own project. I tried in version 0.9.1 as well as 0.10-SNAPSHOT. The dependencies flink-scala
, flink-streaming-scala
, flink-clients
and flink-connector-twitter
are used in the corresponding version.
有人遇到过类似的问题并且可以让我步入正轨吗?
Does anyone have experienced a similar issue and can get me on the right track?
推荐答案
调试com.twitter.hbc.httpclient.ClientBase
导致以下异常:org.apache.http.conn.ConnectTimeoutException: Connect to stream.twitter.com:80 timed out
根据发布在Twitter开发人员论坛上发生的原因是Apaches HttpClient 4.2中的错误.实际上,解析我项目上的依赖关系树表明,flink运行时对com.amazonaws:aws-java-sdk:1.81具有依赖关系,而com.amazonaws:aws-java-sdk:1.81再次与org.apache.httpcomponents:httpclient:4.2具有依赖关系.
According to a post on the Twitter Developer forum this happens because of a bug in Apaches HttpClient 4.2. And in fact, resolving the dependency tree on my project shows that the flink-runtime has a dependency on com.amazonaws:aws-java-sdk:1.81 which again has a dependency on org.apache.httpcomponents:httpclient:4.2.
将HttpClient 4.2.6添加到我的项目的依赖项中可以暂时解决该问题.
Adding HttpClient 4.2.6 to the dependencies of my project solved the problem temporarily.
这篇关于使用Apache Flink连接到Twitter Streaming API时的IOExcpetion的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!