apache-kafka相关内容
我探索了一些设置kafka的选项,我知道Zookeeper必须启动并启动一个kafka。 我想知道如何才能我找到了下面的内容。 $ b 我的zookeeper实例的主机名和端口号---我检查了zoo.cfg,我只能找到ClientPort而不是主机名, hostname是我盒子的主机名? 2)检查ZooKeeper是否启动并正在运行---我试图做一个 ps - ef | grep“z
..
以下内容已经实现 Kafka Producer使用Spark Streaming从twitter中提取数据。 > Kafka Consumer将数据导入Hive External table(在HDFS上)。 虽然目前工作状况良好。 只有一个我正面临的问题,而我的应用程序将数据插入Hive表中时,它会为每个文件的每行数据创建一个小文件。下面是 代码 //定义从 中读取哪
..
我需要删除kafka-0.8.2.2.3中的一个主题。我已经使用下面的命令来删除主题: bin / kafka-topics.sh --zookeeper localhost:2181 - delete --topic DummyTopic 命令执行成功,但是当我运行命令列出主题时,可以看到该主题仍然存在,并显示标记为删除。 bin / kafka- topi
..
现在Golang Kafka图书馆(sarama)提供消费者群组功能,而无需任何外部图书馆帮助与kafka 10.我怎样才能获得消费者群体在任何给定时间处理的当前消息偏移? 以前我使用过kazoo-go( https://github.com/ wvanbergen / kazoo-go )以获取我的消费者群组消息,因为它存储在Zookeeper中。现在我使用sarama-cluster(
..
我打算为Kafka开源贡献一份力量。我把Kafka分成了我的git hub帐号。 然后我在本地克隆它。现在,我应该在主服务器上进行更改还是应该在本地创建一个以票证命名的分支? 解决方案 创建分支以实施功能或错误修正,将你的分支推送到你自己的仓库(你的分支),然后打开一个拉取请求来将你的分支合并到主分支(通常是 master )在官方Kafka仓库中。 以下是对GitHub上的项目进行贡献
..
我有一个Kafka集群,它基于源中的数据更改从源接收消息。在某些情况下,邮件将来会被处理。所以我有两个选择: 消费所有的消息,并将这些未来的消息传回到Kafka的不同主题下日期在主题名称),并有一个Storm拓扑结构,可以查找该日期名称的主题。这将确保消息只在其意图当天被处理。 将它存储在一个单独的数据库中,并构建一个调度程序,只在未来的日期才能读取消息和帖子到Kafka。 / li>
..
首先,这与当我重新运行Flink消费者时,Kafka再次消费最新消息,但它不一样。这个问题的答案似乎不能解决我的问题。如果我错过了那个答案,那么请重新整理答案,因为我清楚地错过了一些东西。 尽管如此,问题是完全一样的 - Flink(kafka连接器)重新运行在关闭之前看到的最后3-9个消息。 我的版本 Flink 1.1.2 Kafka 0.9.0.1 Scala 2
..
我正在尝试远程监控在Docker中运行的JVM。配置如下所示: 机器1:在ubuntu上的docker中运行JVM(在我的例子中运行kafka)机;该机IP为10.0.1.201;在docker运行的应用程序是172.17.0.85。 机器2:运行JMX监控 请注意,当我从机器2运行JMX监视时,它会失败并出现以下错误(注意:当我运行jconsole,jvisualvm,jmxt
..
我正在尝试让JMX监视工作来监视一个测试卡夫卡实例。 我通过boot2docker在docker中运行kafka(ches / kafka),但是我无法正确配置JMX监控。我做了一堆故障排除,我知道kafka实例运行正常(消费者和生产者工作)。当我尝试简单的JMX工具(jconsole和jvisualvm)并且都无法连接(不安全的连接错误,连接失败)时出现此问题。 注意事项的配置项:我
..
我有一个休息服务,在port 5000上的docker容器中运行,用于生成从docker容器运行的kafka主题消息。 我配置了以下属性的生产者客户: - bootstrap.servers = localhost:9093 我已经开始包含以下命令: - docker run -d -p 127.0.0.1:5000:5000
..
所以我的目标是以分散的方式设立几个卡夫卡经纪人的集群。但是我看不到让经纪人意识到的方法。 据我所知,每个代理在配置中需要一个单独的ID,不能保证或配置是否从kubernetes启动容器? 他们还需要有相同的advertised_host? 是否有任何缺失的参数,需要更改节点才能相互发现? 最终执行此类配置是否可行的Docker文件与脚本?和/或一个共享卷? 我正在尝试
..
我有两个Kafka消费者 ConsumerA 和 ConsumerB 。我想在相同的机器上运行这两个kafka消费者彼此独立。他们之间根本没有关系。 每个消费者都应该有一个不同的Properties对象。 每个消费者应该有一个不同的线程池配置,因为如果需要独立于其他消费者,它们可以以多线程方式运行(消费者组)。 以下是我的设计: 消费者类(抽象): public abs
..
使用 kafka 时,可以通过设置kafka.compression.codec属性来设置编码解码器 假设我在我的生产者中使用snappy压缩,当使用kafka消费者消费来自kafka的消息时,我应该做一些解码数据snappy或是kafka消费者的某些内置功能? 在相关文档我找不到任何与kafka消费者编码相关的属性(它只与生产者有关)。 有人可以清除这个? 解决方案 根
..
这是我的查询,如果Cassandra数据库中存在或不存在当前数据ID: row = session.execute(“SELECT * FROM articles where id =%s”,[id]) 在Kafka中解析的消息,然后确定此消息是否存在于Cassandra数据库中,如果它不存在,那么它应该执行插入操作,如果它存在,则不应该插入数据。 p> mess
..
我正在使用Spark流媒体,Apache kafka和Cassandra的项目。 我使用流式kafka集成。在kafka我有一个生产者使用这个配置发送数据: props.put(“metadata.broker.list”,KafkaProperties.ZOOKEEPER) ; props.put(“bootstrap.servers”,KafkaProperties.SERVER);
..
我通过教程apache的卡夫卡运行(Apache的卡夫卡网站),必须使用一个辅助教程(的 HTTP://janschulte.word$p$pss.com/2013/10/13/apache-kafka-0-8-在窗口/ ),并找到另一个答案(“从我在VMware中运行的Ubuntu时,卡夫卡VM无法识别选项'+ UseCom pressedOops'”搜索)只是为了在这里。 现在,我遇到错误:
..
我有一个使用火花流一个项目,我与'火花提交“运行它,但我打这个错误: 15/01/14 10点34分18秒错误ReceiverTracker:注销接收器,流0:开始接收0错误 - java.lang.AbstractMethodError 在org.apache.spark.Logging $ class.log(Logging.scala:52) 在org.apache.s
..
我从卡夫卡使用批处理流(maxRatePerPartition 10.000)数据流。因此,在每批我处理10.000卡夫卡的消息。 在这个批处理我通过创建一个数据框出RDD的处理每封邮件。处理之后,我每个处理的记录保存到使用相同的文件:dataFrame.write.mode(SaveMode.append)。 所以其附加的所有邮件到同一个文件。 这是确定的,只要它是一个批次运行中运行。但在
..
在我们的火花流工作,我们读卡夫卡流的消息。 对于这一点,我们使用它返回 JavaPairInputDStreamfrom 。的 KafkaUtils.createDirectStream API 该消息从卡夫卡读取(从三个主题 - 为test1,test2的,TEST3)以下列方式: 私有静态最后弦乐TOPICS =“TEST1,TEST2,TEST3” HashSet的<串GT; t
..
我有一个Spark流工作,从使用直接的方法卡夫卡集群读取数据。有一个在处理时间,我无法理解,而不是体现在星火UI度量一个周期性高峰。下图显示了这种模式(批处理时间为10秒): 这个问题每次作业运行时间是重复的。 有记录在卡夫卡没有数据被读取所以注意没有真正处理,执行。我期望线持平,接近最小值序列化和发送任务的执行者。 该模式是工作需要9秒(这有5秒调度延迟),接下来的工作需要5秒(无延迟调
..