/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.kafka;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.map.IMap;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;

public class KafkaSource {
    private static final int MESSAGE_COUNT_PER_TOPIC = 1000000;
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String AUTO_OFFSET_RESET = "earliest";
    private static final String SINK_NAME = "sink";
    private EmbeddedZookeeper zkServer;
    private ZkUtils zkUtils;
    private KafkaServer kafkaServer;

    private static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.readFrom(KafkaSources.kafka((Properties)KafkaSource.props("bootstrap.servers", BOOTSTRAP_SERVERS, "key.deserializer", StringDeserializer.class.getCanonicalName(), "value.deserializer", IntegerDeserializer.class.getCanonicalName(), "auto.offset.reset", AUTO_OFFSET_RESET), (String[])new String[]{"t1", "t2"})).withoutTimestamps().writeTo(Sinks.map((String)SINK_NAME));
        return p;
    }

    public static void main(String[] args) throws Exception {
        new KafkaSource().run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void run() throws Exception {
        try {
            this.createKafkaCluster();
            this.fillTopics();
            JetInstance jet = Jet.bootstrappedInstance();
            IMap sinkMap = jet.getMap(SINK_NAME);
            Pipeline p = KafkaSource.buildPipeline();
            long start = System.nanoTime();
            Job job = jet.newJob(p);
            while (true) {
                int mapSize = sinkMap.size();
                System.out.format("Received %d entries in %d milliseconds.%n", mapSize, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
                if (mapSize == 2000000) {
                    job.cancel();
                    break;
                }
                Thread.sleep(100L);
            }
        }
        finally {
            Jet.shutdownAll();
            this.shutdownKafkaCluster();
        }
    }

    private void createKafkaCluster() throws IOException {
        this.zkServer = new EmbeddedZookeeper();
        String zkConnect = "localhost:" + this.zkServer.port();
        ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, (ZkSerializer)ZKStringSerializer$.MODULE$);
        this.zkUtils = ZkUtils.apply((ZkClient)zkClient, (boolean)false);
        KafkaConfig config = new KafkaConfig((Map)KafkaSource.props("zookeeper.connect", zkConnect, "broker.id", "0", "log.dirs", Files.createTempDirectory("kafka-", new FileAttribute[0]).toAbsolutePath().toString(), "offsets.topic.replication.factor", "1", "listeners", "PLAINTEXT://localhost:9092"));
        MockTime mock = new MockTime();
        this.kafkaServer = TestUtils.createServer((KafkaConfig)config, (Time)mock);
    }

    private void fillTopics() {
        AdminUtils.createTopic((ZkUtils)this.zkUtils, (String)"t1", (int)32, (int)1, (Properties)new Properties(), (RackAwareMode)RackAwareMode.Disabled$.MODULE$);
        AdminUtils.createTopic((ZkUtils)this.zkUtils, (String)"t2", (int)64, (int)1, (Properties)new Properties(), (RackAwareMode)RackAwareMode.Disabled$.MODULE$);
        System.out.println("Filling Topics");
        Properties props = KafkaSource.props("bootstrap.servers", BOOTSTRAP_SERVERS, "key.serializer", StringSerializer.class.getName(), "value.serializer", IntegerSerializer.class.getName());
        try (KafkaProducer producer = new KafkaProducer(props);){
            for (int i = 1; i <= 1000000; ++i) {
                producer.send(new ProducerRecord("t1", (Object)("t1-" + i), (Object)i));
                producer.send(new ProducerRecord("t2", (Object)("t2-" + i), (Object)i));
            }
            System.out.println("Published 1000000 messages to topic t1");
            System.out.println("Published 1000000 messages to topic t2");
        }
    }

    private void shutdownKafkaCluster() {
        this.kafkaServer.shutdown();
        this.zkUtils.close();
        this.zkServer.shutdown();
    }

    private static Properties props(String ... kvs) {
        Properties props = new Properties();
        int i = 0;
        while (i < kvs.length) {
            props.setProperty(kvs[i++], kvs[i++]);
        }
        return props;
    }
}

