/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.trident;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.bolt.KafkaProducerTopology;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.Func;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutTransactional;
import org.apache.storm.kafka.trident.TridentKafkaConsumerTopology;
import org.apache.storm.trident.spout.ITridentDataSource;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TridentKafkaClientTopologyNamedTopics {
    private static final String TOPIC_1 = "test-trident";
    private static final String TOPIC_2 = "test-trident-1";
    private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
    private static final Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc();

    private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque(KafkaTridentSpoutConfig<String, String> spoutConfig) {
        return new KafkaTridentSpoutOpaque<String, String>(spoutConfig);
    }

    private KafkaTridentSpoutTransactional<String, String> newKafkaTridentSpoutTransactional(KafkaTridentSpoutConfig<String, String> spoutConfig) {
        return new KafkaTridentSpoutTransactional<String, String>(spoutConfig);
    }

    protected KafkaTridentSpoutConfig<String, String> newKafkaSpoutConfig(String bootstrapServers) {
        return ((KafkaTridentSpoutConfig.Builder)((KafkaTridentSpoutConfig.Builder)((KafkaTridentSpoutConfig.Builder)KafkaTridentSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2).setProp("max.partition.fetch.bytes", 200)).setRecordTranslator(JUST_VALUE_FUNC, new Fields(new String[]{"str"}))).setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)).build();
    }

    public static void main(String[] args) throws Exception {
        new TridentKafkaClientTopologyNamedTopics().run(args);
    }

    protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException {
        String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER;
        boolean isOpaque = args.length > 1 ? Boolean.parseBoolean(args[1]) : true;
        System.out.println("Running with broker url " + brokerUrl + " and isOpaque=" + isOpaque);
        Config tpConf = new Config();
        tpConf.setDebug(true);
        tpConf.setMaxSpoutPending(5);
        StormSubmitter.submitTopology((String)"test-trident-producer", (Map)tpConf, (StormTopology)KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
        StormSubmitter.submitTopology((String)"test-trident-1-producer", (Map)tpConf, (StormTopology)KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
        KafkaTridentSpoutConfig<String, String> spoutConfig = this.newKafkaSpoutConfig(brokerUrl);
        Object spout = isOpaque ? this.newKafkaTridentSpoutOpaque(spoutConfig) : this.newKafkaTridentSpoutTransactional(spoutConfig);
        StormSubmitter.submitTopology((String)"topics-consumer", (Map)tpConf, (StormTopology)TridentKafkaConsumerTopology.newTopology((ITridentDataSource)spout));
    }

    private static class JustValueFunc
    implements Func<ConsumerRecord<String, String>, List<Object>>,
    Serializable {
        private JustValueFunc() {
        }

        @Override
        public List<Object> apply(ConsumerRecord<String, String> record) {
            return new Values(new Object[]{record.value()});
        }
    }
}

