/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout;

import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.bolt.KafkaProducerTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.KafkaSpoutTestBolt;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class KafkaSpoutTopologyMainNamedTopics {
    private static final String TOPIC_2_STREAM = "test_2_stream";
    private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
    private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
    public static final String TOPIC_0 = "kafka-spout-test";
    public static final String TOPIC_1 = "kafka-spout-test-1";
    public static final String TOPIC_2 = "kafka-spout-test-2";

    public static void main(String[] args) throws Exception {
        new KafkaSpoutTopologyMainNamedTopics().runMain(args);
    }

    protected void runMain(String[] args) throws Exception {
        String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER;
        System.out.println("Running with broker url: " + brokerUrl);
        Config tpConf = this.getConfig();
        StormSubmitter.submitTopology((String)"kafka-spout-test-producer", (Map)tpConf, (StormTopology)KafkaProducerTopology.newTopology(brokerUrl, TOPIC_0));
        StormSubmitter.submitTopology((String)"kafka-spout-test-1-producer", (Map)tpConf, (StormTopology)KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
        StormSubmitter.submitTopology((String)"kafka-spout-test-2-producer", (Map)tpConf, (StormTopology)KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
        StormSubmitter.submitTopology((String)"storm-kafka-client-spout-test", (Map)tpConf, (StormTopology)this.getTopologyKafkaSpout(this.getKafkaSpoutConfig(brokerUrl)));
    }

    protected Config getConfig() {
        Config config = new Config();
        config.setDebug(true);
        return config;
    }

    protected StormTopology getTopologyKafkaSpout(KafkaSpoutConfig<String, String> spoutConfig) {
        TopologyBuilder tp = new TopologyBuilder();
        tp.setSpout("kafka_spout", new KafkaSpout<String, String>(spoutConfig), (Number)1);
        ((BoltDeclarer)tp.setBolt("kafka_bolt", (IRichBolt)new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM)).shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
        tp.setBolt("kafka_bolt_1", (IRichBolt)new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
        return tp.createTopology();
    }

    protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers) {
        ByTopicRecordTranslator trans = new ByTopicRecordTranslator(r -> new Values(new Object[]{r.topic(), r.partition(), r.offset(), r.key(), r.value()}), new Fields(new String[]{"topic", "partition", "offset", "key", "value"}), TOPIC_0_1_STREAM);
        trans.forTopic(TOPIC_2, r -> new Values(new Object[]{r.topic(), r.partition(), r.offset(), r.key(), r.value()}), new Fields(new String[]{"topic", "partition", "offset", "key", "value"}), TOPIC_2_STREAM);
        return ((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)KafkaSpoutConfig.builder(bootstrapServers, TOPIC_0, TOPIC_1, TOPIC_2).setProp("group.id", "kafkaSpoutTestGroup")).setRetry(this.getRetryService()).setRecordTranslator(trans)).setOffsetCommitPeriodMs(10000L).setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)).setMaxUncommittedOffsets(250).build();
    }

    protected KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500L), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2L), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10L));
    }
}

