如何从默认设置中设置 spoutconfig? [英] How to Set spoutconfig from default setting?

查看:44
本文介绍了如何从默认设置中设置 spoutconfig?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用图形 API 获取 fb 页面数据.每个帖子的大小超过 1MB,其中 kafka 默认 fetch.message 为 1MB.通过在 kafa consumer.properties 和 server.properties 文件中添加以下行,我已将 kafka 属性从 1MB 更改为 3MB.

I'm Trying to get the fb pages data using graph api. The size each post is more than 1MB where kafka default fetch.message is 1MB. I have changed the kafka properties from 1MB to 3MB by adding the below lines in kafa consumer.properties and server.properties file.

fetch.message.max.bytes=3048576 (consumer.properties)
file message.max.bytes=3048576 (server.properties)
replica.fetch.max.bytes=3048576 (server.properties )

现在在Kafka中添加以上几行后,3MB的消息数据将进入kafka数据日志.但是 STORM 无法处理该数据,它只能读取默认大小,即 1MB 数据.我应该向风暴拓扑添加哪些参数以从 kafka 主题读取 3MB 数据.我需要在风暴中增加 buffer.size 吗?不清楚.

Now after adding the above lines in Kafka, 3MB message data is going into kafka data logs. But STORM is unable to process that data and it is able to read only default size i.e.,1MB data.What Parameters I should add to storm topology to read the 3MB data from kafka topic.Do i need to increase buffer.size in storm?don't have a clear idea about it.

这是我的拓扑代码.

 String argument = args[0];
    Config conf = new Config();
    conf.put(JDBC_CONF, map);
    conf.setDebug(true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    //set the number of workers
    conf.setNumWorkers(3);

    TopologyBuilder builder = new TopologyBuilder();       
   //Setup Kafka spout
    BrokerHosts hosts = new ZkHosts("localhost:2181");
    String topic = "year1234"; 
    String zkRoot = "";
    String consumerGroupId = "group1";
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);

        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());           KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("KafkaSpout", kafkaSpout,1);        builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");        builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");

提前致谢

推荐答案

SpoutConfig 类扩展了 KafkaConfig,它具有以下所有设置:

the class SpoutConfig extends KafkaConfig which has all the following settings:

public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;

注意它们是公开的,因此您可以更改它们

notice that they are public so you can change them

spoutConfig.fetchSizeBytes = 3048576;
spoutConfig.bufferSizeBytes = 3048576;

见这里:http://grepcode.com/file/repo1.maven.org/maven2/org.apache.storm/storm-kafka/0.9.2-incubating/storm/kafka/KafkaConfig.java#KafkaConfig

这篇关于如何从默认设置中设置 spoutconfig?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆