/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.sql.kafka;

import com.google.common.base.Preconditions;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.RecordTranslator;
import org.apache.storm.spout.Scheme;
import org.apache.storm.sql.kafka.RecordTranslatorSchemeAdapter;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.IOutputSerializer;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
import org.apache.storm.sql.runtime.utils.SerdeUtils;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class KafkaDataSourcesProvider
implements DataSourcesProvider {
    private static final String CONFIG_KEY_PRODUCER = "producer";
    private static final String URI_PARAMS_BOOTSTRAP_SERVERS = "bootstrap-servers";

    public String scheme() {
        return "kafka";
    }

    public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) {
        ArrayList<String> fieldNames = new ArrayList<String>();
        int primaryIndex = -1;
        for (int i = 0; i < fields.size(); ++i) {
            FieldInfo f = fields.get(i);
            fieldNames.add(f.name());
            if (!f.isPrimary()) continue;
            primaryIndex = i;
        }
        Preconditions.checkState((primaryIndex != -1 ? 1 : 0) != 0, (Object)"Kafka stream table must have a primary key");
        Scheme scheme = SerdeUtils.getScheme((String)inputFormatClass, (Properties)properties, fieldNames);
        Map<String, String> values = KafkaDataSourcesProvider.parseUriParams(uri.getQuery());
        String bootstrapServers = values.get(URI_PARAMS_BOOTSTRAP_SERVERS);
        Preconditions.checkNotNull((Object)bootstrapServers, (Object)"bootstrap-servers must be specified");
        String topic = uri.getHost();
        KafkaSpoutConfig kafkaSpoutConfig = ((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)new KafkaSpoutConfig.Builder(bootstrapServers, new String[]{topic}).setProp("key.deserializer", ByteBufferDeserializer.class)).setProp("value.deserializer", ByteBufferDeserializer.class)).setProp("group.id", (Object)("storm-sql-kafka-" + UUID.randomUUID().toString()))).setRecordTranslator((RecordTranslator)new RecordTranslatorSchemeAdapter(scheme))).build();
        IOutputSerializer serializer = SerdeUtils.getSerializer((String)outputFormatClass, (Properties)properties, fieldNames);
        return new KafkaStreamsDataSource((KafkaSpoutConfig<ByteBuffer, ByteBuffer>)kafkaSpoutConfig, bootstrapServers, topic, properties, serializer);
    }

    private static Map<String, String> parseUriParams(String query) {
        String[] params;
        HashMap<String, String> res = new HashMap<String, String>();
        if (query == null) {
            return res;
        }
        for (String p : params = query.split("&")) {
            String[] v = p.split("=", 2);
            if (v.length <= 1) continue;
            res.put(v[0], v[1]);
        }
        return res;
    }

    private static class KafkaStreamsDataSource
    implements ISqlStreamsDataSource {
        private final KafkaSpoutConfig<ByteBuffer, ByteBuffer> kafkaSpoutConfig;
        private final String bootstrapServers;
        private final String topic;
        private final Properties props;
        private final IOutputSerializer serializer;

        KafkaStreamsDataSource(KafkaSpoutConfig<ByteBuffer, ByteBuffer> kafkaSpoutConfig, String bootstrapServers, String topic, Properties props, IOutputSerializer serializer) {
            this.kafkaSpoutConfig = kafkaSpoutConfig;
            this.bootstrapServers = bootstrapServers;
            this.topic = topic;
            this.props = props;
            this.serializer = serializer;
        }

        public IRichSpout getProducer() {
            return new KafkaSpout(this.kafkaSpoutConfig);
        }

        public IRichBolt getConsumer() {
            Preconditions.checkArgument((!this.props.isEmpty() ? 1 : 0) != 0, (Object)("Writable Kafka table " + this.topic + " must contain producer config"));
            HashMap producerConfig = (HashMap)this.props.get(KafkaDataSourcesProvider.CONFIG_KEY_PRODUCER);
            this.props.putAll((Map<?, ?>)producerConfig);
            Preconditions.checkState((!this.props.containsKey("bootstrap.servers") ? 1 : 0) != 0, (Object)("Writable Kafka table " + this.topic + " must not contain \"bootstrap.servers\" config, set it in the kafka URL instead"));
            Preconditions.checkState((!this.props.containsKey("value.serializer") ? 1 : 0) != 0, (Object)("Writable Kafka table " + this.topic + "must not contain value.serializer, it will be hardcoded to be " + String.valueOf(ByteBufferSerializer.class)));
            this.props.put("value.serializer", ByteBufferSerializer.class);
            this.props.put("bootstrap.servers", this.bootstrapServers);
            SqlKafkaMapper mapper = new SqlKafkaMapper(this.serializer);
            return new KafkaBolt().withTopicSelector((KafkaTopicSelector)new DefaultTopicSelector(this.topic)).withProducerProperties(this.props).withTupleToKafkaMapper((TupleToKafkaMapper)mapper);
        }
    }

    private static class SqlKafkaMapper
    implements TupleToKafkaMapper<Object, ByteBuffer> {
        private final IOutputSerializer serializer;

        private SqlKafkaMapper(IOutputSerializer serializer) {
            this.serializer = serializer;
        }

        public Object getKeyFromTuple(Tuple tuple) {
            return tuple.getValue(0);
        }

        public ByteBuffer getMessageFromTuple(Tuple tuple) {
            return this.serializer.write((List)((Values)tuple.getValue(1)), null);
        }
    }
}

