apache-kafka相关内容

如何检查ZooKeeper是否在命令提示符下运行或启动?

我探索了一些设置kafka的选项,我知道Zookeeper必须启动并启动一个kafka。 我想知道如何才能我找到了下面的内容。 $ b 我的zookeeper实例的主机名和端口号---我检查了zoo.cfg,我只能找到ClientPort而不是主机名, hostname是我盒子的主机名? 2)检查ZooKeeper是否启动并正在运行---我试图做一个 ps - ef | grep“z ..

Hive为HDFS中的每个插入创建多个小文件

以下内容已经实现 Kafka Producer使用Spark Streaming从twitter中提取数据。 > Kafka Consumer将数据导入Hive External table(在HDFS上)。 虽然目前工作状况良好。 只有一个我正面临的问题,而我的应用程序将数据插入Hive表中时,它会为每个文件的每行数据创建一个小文件。下面是 代码 //定义从 中读取哪 ..
发布时间:2018-05-31 18:42:33 分布式计算/Hadoop

如何删除apache kafka中的主题

我需要删除kafka-0.8.2.2.3中的一个主题。我已经使用下面的命令来删除主题: bin / kafka-topics.sh --zookeeper localhost:2181 - delete --topic DummyTopic 命令执行成功,但是当我运行命令列出主题时,可以看到该主题仍然存在,并显示标记为删除。 bin / kafka- topi ..
发布时间:2018-05-31 18:33:17 Java开发

如何在Golang Kafka 10中获得消费群组的分区补偿

现在Golang Kafka图书馆(sarama)提供消费者群组功能,而无需任何外部图书馆帮助与kafka 10.我怎样才能获得消费者群体在任何给定时间处理的当前消息偏移? 以前我使用过kazoo-go( https://github.com/ wvanbergen / kazoo-go )以获取我的消费者群组消息,因为它存储在Zookeeper中。现在我使用sarama-cluster( ..
发布时间:2018-05-02 18:07:34 其他开发语言

我应该在为apache kafka创建PR时创建分支吗?

我打算为Kafka开源贡献一份力量。我把Kafka分成了我的git hub帐号。 然后我在本地克隆它。现在,我应该在主服务器上进行更改还是应该在本地创建一个以票证命名的分支? 解决方案 创建分支以实施功能或错误修正,将你的分支推送到你自己的仓库(你的分支),然后打开一个拉取请求来将你的分支合并到主分支(通常是 master )在官方Kafka仓库中。 以下是对GitHub上的项目进行贡献 ..
发布时间:2018-04-28 12:18:06 其他开发

卡夫卡作为未来事件的数据存储

我有一个Kafka集群,它基于源中的数据更改从源接收消息。在某些情况下,邮件将来会被处理。所以我有两个选择: 消费所有的消息,并将这些未来的消息传回到Kafka的不同主题下日期在主题名称),并有一个Storm拓扑结构,可以查找该日期名称的主题。这将确保消息只在其意图当天被处理。 将它存储在一个单独的数据库中,并构建一个调度程序,只在未来的日期才能读取消息和帖子到Kafka。 / li> ..
发布时间:2017-09-03 03:04:06 Java开发

卡夫卡&重新启动时会重新复制邮件

首先,这与当我重新运行Flink消费者时,Kafka再次消费最新消息,但它不一样。这个问题的答案似乎不能解决我的问题。如果我错过了那个答案,那么请重新整理答案,因为我清楚地错过了一些东西。 尽管如此,问题是完全一样的 - Flink(kafka连接器)重新运行在关闭之前看到的最后3-9个消息。 我的版本 Flink 1.1.2 Kafka 0.9.0.1 Scala 2 ..
发布时间:2017-07-21 00:36:08 其他开发

如何从Dock外部访问JMX界面?

我正在尝试远程监控在Docker中运行的JVM。配置如下所示: 机器1:在ubuntu上的docker中运行JVM(在我的例子中运行kafka)机;该机IP为10.0.1.201;在docker运行的应用程序是172.17.0.85。 机器2:运行JMX监控 请注意,当我从机器2运行JMX监视时,它会失败并出现以下错误(注意:当我运行jconsole,jvisualvm,jmxt ..
发布时间:2017-06-10 21:17:58 Linux/Unix

在boot2docker中的docker中运行kafka时,使用JMX进行kafka监视

我正在尝试让JMX监视工作来监视一个测试卡夫卡实例。 我通过boot2docker在docker中运行kafka(ches / kafka),但是我无法正确配置JMX监控。我做了一堆故障排除,我知道kafka实例运行正常(消费者和生产者工作)。当我尝试简单的JMX工具(jconsole和jvisualvm)并且都无法连接(不安全的连接错误,连接失败)时出现此问题。 注意事项的配置项:我 ..
发布时间:2017-06-10 20:51:56 Linux/Unix

Kafka在Kubernetes多节点

所以我的目标是以分散的方式设立几个卡夫卡经纪人的集群。但是我看不到让经纪人意识到的方法。 据我所知,每个代理在配置中需要一个单独的ID,不能保证或配置是否从kubernetes启动容器? 他们还需要有相同的advertised_host? 是否有任何缺失的参数,需要更改节点才能相互发现? 最终执行此类配置是否可行的Docker文件与脚本?和/或一个共享卷? 我正在尝试 ..

如何在同一个盒子上独立运行多个kafka消费者?

我有两个Kafka消费者 ConsumerA 和 ConsumerB 。我想在相同的机器上运行这两个kafka消费者彼此独立。他们之间根本没有关系。 每个消费者都应该有一个不同的Properties对象。 每个消费者应该有一个不同的线程池配置,因为如果需要独立于其他消费者,它们可以以多线程方式运行(消费者组)。 以下是我的设计: 消费者类(抽象): public abs ..

Kafka消息编解码器 - 压缩和解压缩

使用 kafka 时,可以通过设置kafka.compression.codec属性来设置编码解码器 假设我在我的生产者中使用snappy压缩,当使用kafka消费者消费来自kafka的消息时,我应该做一些解码数据snappy或是kafka消费者的某些内置功能? 在相关文档我找不到任何与kafka消费者编码相关的属性(它只与生产者有关)。 有人可以清除这个? 解决方案 根 ..
发布时间:2016-12-25 12:39:16 其他开发

为什么我的数据插入在我的Cassandra数据库有时稳定,有时慢?

这是我的查询,如果Cassandra数据库中存在或不存在当前数据ID: row = session.execute(“SELECT * FROM articles where id =%s”,[id]) 在Kafka中解析的消息,然后确定此消息是否存在于Cassandra数据库中,如果它不存在,那么它应该执行插入操作,如果它存在,则不应该插入数据。 p> mess ..
发布时间:2016-11-13 16:24:46 Python

无法找到或加载主类org.apache.zookeeper.server.quorum.QuorumPeerMain

我通过教程apache的卡夫卡运行(Apache的卡夫卡网站),必须使用一个辅助教程(的 HTTP://janschulte.word$p$pss.com/2013/10/13/apache-kafka-0-8-在窗口/ ),并找到另一个答案(“从我在VMware中运行的Ubuntu时,卡夫卡VM无法识别选项'+ UseCom pressedOops'”搜索)只是为了在这里。 现在,我遇到错误: ..
发布时间:2016-08-04 09:06:06 Linux/Unix

卡夫卡星火批量涌入单个文件

我从卡夫卡使用批处理流(maxRatePerPartition 10.000)数据流。因此,在每批我处理10.000卡夫卡的消息。 在这个批处理我通过创建一个数据框出RDD的处理每封邮件。处理之后,我每个处理的记录保存到使用相同的文件:dataFrame.write.mode(SaveMode.append)。 所以其附加的所有邮件到同一个文件。 这是确定的,只要它是一个批次运行中运行。但在 ..
发布时间:2016-05-22 16:33:24 其他开发

从卡夫卡的火花得到消息的主题

在我们的火花流工作,我们读卡夫卡流的消息。 对于这一点,我们使用它返回 JavaPairInputDStreamfrom 。的 KafkaUtils.createDirectStream API 该消息从卡夫卡读取(从三个主题 - 为test1,test2的,TEST3)以下列方式: 私有静态最后弦乐TOPICS =“TEST1,TEST2,TEST3” HashSet的<串GT; t ..
发布时间:2016-05-22 16:30:05 其他开发

星火流卡夫卡直接流处理时的性能尖峰

我有一个Spark流工作,从使用直接的方法卡夫卡集群读取数据。有一个在处理时间,我无法理解,而不是体现在星火UI度量一个周期性高峰。下图显示了这种模式(批处理时间为10秒): 这个问题每次作业运行时间是重复的。 有记录在卡夫卡没有数据被读取所以注意没有真正处理,执行。我期望线持平,接近最小值序列化和发送任务的执行者。 该模式是工作需要9秒(这有5秒调度延迟),接下来的工作需要5秒(无延迟调 ..
发布时间:2016-05-22 16:19:57 其他开发