/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class EosTestClient
extends SmokeTestUtil {
    static final String APP_ID = "EosTest";
    private final String kafka;
    private final File stateDir;
    private final boolean withRepartitioning;
    private KafkaStreams streams;
    private boolean uncaughtException;
    private boolean isRunning = true;

    EosTestClient(String kafka, File stateDir, boolean withRepartitioning) {
        this.kafka = kafka;
        this.stateDir = stateDir;
        this.withRepartitioning = withRepartitioning;
    }

    public void start() {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                EosTestClient.this.isRunning = false;
                EosTestClient.this.streams.close(5L, TimeUnit.SECONDS);
                if (!EosTestClient.this.uncaughtException) {
                    System.out.println("EOS-TEST-CLIENT-CLOSED");
                }
            }
        }));
        while (this.isRunning) {
            if (this.streams == null) {
                this.uncaughtException = false;
                this.streams = this.createKafkaStreams(this.stateDir, this.kafka);
                this.streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        System.out.println("EOS-TEST-CLIENT-EXCEPTION");
                        e.printStackTrace();
                        EosTestClient.this.uncaughtException = true;
                    }
                });
                this.streams.start();
            }
            if (this.uncaughtException) {
                this.streams.close(5L, TimeUnit.SECONDS);
                this.streams = null;
            }
            EosTestClient.sleep(1000L);
        }
    }

    private KafkaStreams createKafkaStreams(File stateDir, String kafka) {
        Properties props = new Properties();
        props.put("application.id", APP_ID);
        props.put("state.dir", stateDir.toString());
        props.put("bootstrap.servers", kafka);
        props.put("num.stream.threads", (Object)2);
        props.put("num.standby.replicas", (Object)2);
        props.put("replication.factor", (Object)3);
        props.put("processing.guarantee", "exactly_once");
        props.put("cache.max.bytes.buffering", (Object)0);
        props.put("default.key.serde", Serdes.String().getClass());
        props.put("default.value.serde", Serdes.Integer().getClass());
        KStreamBuilder builder = new KStreamBuilder();
        KStream data = builder.stream(new String[]{"data"});
        data.to("echo");
        data.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        KGroupedStream groupedData = data.groupByKey();
        groupedData.aggregate((Initializer)new Initializer<Integer>(){

            public Integer apply() {
                return Integer.MAX_VALUE;
            }
        }, (Aggregator)new Aggregator<String, Integer, Integer>(){

            public Integer apply(String aggKey, Integer value, Integer aggregate) {
                return value < aggregate ? value : aggregate;
            }
        }, intSerde, "min").to(stringSerde, intSerde, "min");
        groupedData.aggregate((Initializer)new Initializer<Long>(){

            public Long apply() {
                return 0L;
            }
        }, (Aggregator)new Aggregator<String, Integer, Long>(){

            public Long apply(String aggKey, Integer value, Long aggregate) {
                return (long)value.intValue() + aggregate;
            }
        }, longSerde, "sum").to(stringSerde, longSerde, "sum");
        if (this.withRepartitioning) {
            KStream repartitionedData = data.through("repartition");
            repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition"), new String[0]);
            KGroupedStream groupedDataAfterRepartitioning = repartitionedData.groupByKey();
            groupedDataAfterRepartitioning.aggregate((Initializer)new Initializer<Integer>(){

                public Integer apply() {
                    return Integer.MIN_VALUE;
                }
            }, (Aggregator)new Aggregator<String, Integer, Integer>(){

                public Integer apply(String aggKey, Integer value, Integer aggregate) {
                    return value > aggregate ? value : aggregate;
                }
            }, intSerde, "max").to(stringSerde, intSerde, "max");
            groupedDataAfterRepartitioning.count("cnt").to(stringSerde, longSerde, "cnt");
        }
        return new KafkaStreams((TopologyBuilder)builder, props);
    }
}

