/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.migration;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.storm.kafka.migration.MapUtil;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSpoutMigration {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutMigration.class);

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("Args: confFile");
            System.exit(1);
        }
        Map conf = Utils.findAndReadConfigFile((String)args[0]);
        Configuration configuration = new Configuration();
        configuration.zkHosts = (String)MapUtil.getOrError(conf, "zookeeper.servers");
        configuration.zkRoot = (String)MapUtil.getOrError(conf, "zookeeper.root");
        configuration.spoutId = (String)MapUtil.getOrError(conf, "spout.id");
        configuration.topic = (String)MapUtil.getOrError(conf, "topic");
        configuration.isWildcardTopic = (Boolean)MapUtil.getOrError(conf, "is.wildcard.topic");
        configuration.kafkaBootstrapServers = (String)MapUtil.getOrError(conf, "kafka.bootstrap.servers");
        configuration.newSpoutConsumerGroup = (String)MapUtil.getOrError(conf, "new.spout.consumer.group");
        configuration.zkSessionTimeoutMs = (Integer)MapUtil.getOrError(conf, "zookeeper.session.timeout.ms");
        configuration.zkConnectionTimeoutMs = (Integer)MapUtil.getOrError(conf, "zookeeper.connection.timeout.ms");
        configuration.zkRetryTimes = (Integer)MapUtil.getOrError(conf, "zookeeper.retry.times");
        configuration.zkRetryIntervalMs = (Integer)MapUtil.getOrError(conf, "zookeeper.retry.interval.ms");
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = KafkaSpoutMigration.getOffsetsToCommit(configuration);
        LOG.info("Migrating offsets {}", (Object)offsetsToCommit);
        Properties props = new Properties();
        props.put("bootstrap.servers", configuration.kafkaBootstrapServers);
        props.put("group.id", configuration.newSpoutConsumerGroup);
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        try (KafkaConsumer consumer = new KafkaConsumer(props);){
            consumer.assign(offsetsToCommit.keySet());
            consumer.commitSync(offsetsToCommit);
        }
        LOG.info("Migrated offsets {} to consumer group {}", (Object)offsetsToCommit, (Object)configuration.newSpoutConsumerGroup);
    }

    private static Map<TopicPartition, OffsetAndMetadata> getOffsetsAtPath(CuratorFramework curator, ObjectMapper objectMapper, String partitionsRoot) throws Exception {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        if (curator.checkExists().forPath(partitionsRoot) == null) {
            throw new RuntimeException("No such path " + partitionsRoot);
        }
        List partitionPaths = (List)curator.getChildren().forPath(partitionsRoot);
        for (String partitionPath : partitionPaths) {
            String absPartitionPath = partitionsRoot + "/" + partitionPath;
            LOG.info("Reading offset data from path {}", (Object)absPartitionPath);
            byte[] partitionBytes = (byte[])curator.getData().forPath(absPartitionPath);
            Map<String, Object> partitionMetadata = objectMapper.readValue(partitionBytes, new TypeReference<Map<String, Object>>(){});
            String topic = (String)partitionMetadata.get("topic");
            int partition = ((Number)partitionMetadata.get("partition")).intValue();
            long offset = ((Number)partitionMetadata.get("offset")).longValue();
            offsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset));
        }
        return offsets;
    }

    private static Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(Configuration configuration) throws Exception {
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        try (CuratorFramework curator = KafkaSpoutMigration.newCurator(configuration);){
            curator.start();
            ObjectMapper objectMapper = new ObjectMapper();
            String spoutRoot = configuration.zkRoot + "/" + configuration.spoutId;
            if (curator.checkExists().forPath(spoutRoot) == null) {
                throw new RuntimeException("No such path " + spoutRoot);
            }
            if (configuration.isWildcardTopic) {
                LOG.info("Expecting wildcard topics, looking for topics in {}", (Object)spoutRoot);
                List topicPaths = (List)curator.getChildren().forPath(spoutRoot);
                for (String topicPath : topicPaths) {
                    if (!topicPath.matches(configuration.topic)) {
                        LOG.info("Skipping directory {} because it doesn't match the topic pattern {}", (Object)topicPath, (Object)configuration.topic);
                        continue;
                    }
                    String absTopicPath = spoutRoot + "/" + topicPath;
                    LOG.info("Looking for partitions in {}", (Object)absTopicPath);
                    offsetsToCommit.putAll(KafkaSpoutMigration.getOffsetsAtPath(curator, objectMapper, absTopicPath));
                }
            } else {
                LOG.info("Expecting exact topic match, looking for offsets in {}", (Object)spoutRoot);
                offsetsToCommit.putAll(KafkaSpoutMigration.getOffsetsAtPath(curator, objectMapper, spoutRoot));
            }
        }
        return offsetsToCommit;
    }

    private static CuratorFramework newCurator(Configuration config) throws Exception {
        return CuratorFrameworkFactory.newClient(config.zkHosts, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, new RetryNTimes(config.zkRetryTimes, config.zkRetryIntervalMs));
    }

    private static class Configuration {
        private String zkHosts;
        private String zkRoot;
        private String spoutId;
        private String topic;
        private boolean isWildcardTopic;
        private String kafkaBootstrapServers;
        private String newSpoutConsumerGroup;
        private int zkSessionTimeoutMs;
        private int zkConnectionTimeoutMs;
        private int zkRetryTimes;
        private int zkRetryIntervalMs;

        private Configuration() {
        }
    }
}

