如何从默认设置中设置spoutconfig? [英] How to Set spoutconfig from default setting?

查看:378
本文介绍了如何从默认设置中设置spoutconfig?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用图形API获取fb页面数据.每个帖子的大小大于1MB,其中kafka默认fetch.message为1MB.通过在kafa Consumer.properties和server.properties文件中添加以下行,我已将kafka属性从1MB更改为3MB.

I'm Trying to get the fb pages data using graph api. The size each post is more than 1MB where kafka default fetch.message is 1MB. I have changed the kafka properties from 1MB to 3MB by adding the below lines in kafa consumer.properties and server.properties file.

fetch.message.max.bytes=3048576 (consumer.properties)
file message.max.bytes=3048576 (server.properties)
replica.fetch.max.bytes=3048576 (server.properties )

现在在Kafka中添加以上行后,3MB消息数据将进入kafka数据日志中.但是STORM无法处理该数据,并且只能读取默认大小(即1MB数据).我应该添加到Storm拓扑中的哪些参数才能从kafka主题中读取3MB数据.我是否需要在Storm中增加buffer.size?对此没有明确的想法.

Now after adding the above lines in Kafka, 3MB message data is going into kafka data logs. But STORM is unable to process that data and it is able to read only default size i.e.,1MB data.What Parameters I should add to storm topology to read the 3MB data from kafka topic.Do i need to increase buffer.size in storm?don't have a clear idea about it.

这是我的拓扑代码.

 String argument = args[0];
    Config conf = new Config();
    conf.put(JDBC_CONF, map);
    conf.setDebug(true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    //set the number of workers
    conf.setNumWorkers(3);

    TopologyBuilder builder = new TopologyBuilder();       
   //Setup Kafka spout
    BrokerHosts hosts = new ZkHosts("localhost:2181");
    String topic = "year1234"; 
    String zkRoot = "";
    String consumerGroupId = "group1";
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);

        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());           KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("KafkaSpout", kafkaSpout,1);        builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");        builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");

预先感谢

推荐答案

SpoutConfig类扩展了KafkaConfig,它具有以下所有设置:

the class SpoutConfig extends KafkaConfig which has all the following settings:

public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;

请注意,它们是公开的,因此您可以对其进行更改

notice that they are public so you can change them

spoutConfig.fetchSizeBytes = 3048576;
spoutConfig.bufferSizeBytes = 3048576;

请参阅此处: 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆