300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > storm之spout

storm之spout

时间:2018-11-13 22:14:26

相关推荐

storm之spout

一、什么是spout

spout:喷嘴、喷口。即数据从这里发出。

spout是storm的数据来源,而spout的数据来源又是从其他地方,比如数据库或者消息中间件中流入的。

以Kafka为例,spout先从kafka中拉取数据,然后封装为一个tuple,发给下游的bolt进行处理。对于Kafka来说,spout是消费者;对于bolt来说spout是生产者。

为什么要用spout去拉取消息,而不是直接由bolt接收推送的数据呢,这中拉模式有什么好处呢?

如果,将数据直接推送给bolt,当数据量突然增加的时候,可能导致某一个bolt瘫痪,继而影响整个topology运行;而当没有数据的时候,整个topolog又处于空闲状态,浪费资源。而由spout去拉取消息则不会出现这样的问题。

二、KafkaSpout

KafkaSpout实现了从Kafka拉取数据为storm提供数据源。并且重新实现了ack机制。一般的我们通过简单的配置就可以使用了。//kafkaSpout配置private KafkaSpoutConfig<String, String> kafkaSpoutConfig() {final Fields outputFields = new Fields("topic", "partition", "offset", "timestamp", "key", "msg_from_kafka");KafkaSpoutConfig<String, String> config;//consumer的配置Properties props = new Properties();//默认由kafkaSpout进行ack后才提交(false),如果自动提交,则kafkaspout的ack失效,可能丢失或重复数据props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),TimeInterval.milliSeconds(2),1,TimeInterval.seconds(10));config = KafkaSpoutConfig.builder("ip:9092", "topic_test")//首次消费消息的offset.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)//最后一个参数为输出字段.setRecordTranslator((r) -> new Values(r.topic(), r.partition(), r.offset(), r.timestamp(), r.key(), r.value()), outputFields)//offset自动提交时间间隔,如果设置了mit=true则无效.setOffsetCommitPeriodMs(1_000)//1秒//达到这个值后向提交offset.setMaxUncommittedOffsets(1_000_000)//10万//group.setGroupId("test-w")//kafka consumer配置.setProp(props).setRetry(kafkaSpoutRetryService).build();return config;}//拓扑结构private StormTopology stormTopology() {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new ProducerSpout(kafkaSpoutConfig()), 1);builder.setBolt("bolt1", new BoltTest(), 1).shuffleGrouping("spout");return builder.createTopology();}

kafkaspout的所有配置项:

public static final long DEFAULT_POLL_TIMEOUT_MS = 200L;public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30000L;public static final int DEFAULT_MAX_RETRIES = 2147483647;public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10000000;public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2000L;public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0L), TimeInterval.milliSeconds(2L), 2147483647, TimeInterval.seconds(10L));public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0L), TimeInterval.milliSeconds(0L), 2147483647, TimeInterval.milliSeconds(0L));private final Map<String, Object> kafkaProps;private final Subscription subscription;private final SerializableDeserializer<K> keyDes;private final Class<? extends Deserializer<K>> keyDesClazz;private final SerializableDeserializer<V> valueDes;private final Class<? extends Deserializer<V>> valueDesClazz;private final long pollTimeoutMs;private final RecordTranslator<K, V> translator;private final long offsetCommitPeriodMs;private final int maxUncommittedOffsets;private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;private final KafkaSpoutRetryService retryService;private final long partitionRefreshPeriodMs;private final boolean emitNullTuples;

具体含义在后面会总结。

参考资料:

《storm技术内幕与大数据实战》

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。