/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class SmokeTestClient
extends SmokeTestUtil {
    private final String kafka;
    private final File stateDir;
    private KafkaStreams streams;
    private Thread thread;

    public SmokeTestClient(File stateDir, String kafka) {
        this.stateDir = stateDir;
        this.kafka = kafka;
    }

    public void start() {
        this.streams = SmokeTestClient.createKafkaStreams(this.stateDir, this.kafka);
        this.streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                e.printStackTrace();
            }
        });
        this.thread = new Thread(){

            @Override
            public void run() {
                SmokeTestClient.this.streams.start();
            }
        };
        this.thread.start();
    }

    public void close() {
        this.streams.close();
        try {
            this.thread.join();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private static KafkaStreams createKafkaStreams(File stateDir, String kafka) {
        Properties props = new Properties();
        props.put("application.id", "SmokeTest");
        props.put("state.dir", stateDir.toString());
        props.put("bootstrap.servers", kafka);
        props.put("num.stream.threads", (Object)3);
        props.put("num.standby.replicas", (Object)2);
        props.put("buffered.records.per.partition", (Object)100);
        props.put("replication.factor", (Object)2);
        props.put("commit.interval.ms", (Object)1000);
        props.put("auto.offset.reset", "earliest");
        KStreamBuilder builder = new KStreamBuilder();
        KStream source = builder.stream(stringSerde, intSerde, new String[]{"data"});
        source.to(stringSerde, intSerde, "echo");
        KStream data = source.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value == null || value != Integer.MAX_VALUE;
            }
        });
        data.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        KGroupedStream groupedData = data.groupByKey(stringSerde, intSerde);
        groupedData.aggregate((Initializer)new Initializer<Integer>(){

            public Integer apply() {
                return Integer.MAX_VALUE;
            }
        }, (Aggregator)new Aggregator<String, Integer, Integer>(){

            public Integer apply(String aggKey, Integer value, Integer aggregate) {
                return value < aggregate ? value : aggregate;
            }
        }, (Windows)TimeWindows.of((long)TimeUnit.DAYS.toMillis(1L)), intSerde, "uwin-min").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, intSerde, "min");
        KTable minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
        minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"), new String[0]);
        groupedData.aggregate((Initializer)new Initializer<Integer>(){

            public Integer apply() {
                return Integer.MIN_VALUE;
            }
        }, (Aggregator)new Aggregator<String, Integer, Integer>(){

            public Integer apply(String aggKey, Integer value, Integer aggregate) {
                return value > aggregate ? value : aggregate;
            }
        }, (Windows)TimeWindows.of((long)TimeUnit.DAYS.toMillis(2L)), intSerde, "uwin-max").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, intSerde, "max");
        KTable maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
        maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"), new String[0]);
        groupedData.aggregate((Initializer)new Initializer<Long>(){

            public Long apply() {
                return 0L;
            }
        }, (Aggregator)new Aggregator<String, Integer, Long>(){

            public Long apply(String aggKey, Integer value, Long aggregate) {
                return (long)value.intValue() + aggregate;
            }
        }, (Windows)TimeWindows.of((long)TimeUnit.DAYS.toMillis(2L)), longSerde, "win-sum").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, longSerde, "sum");
        KTable sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
        sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"), new String[0]);
        groupedData.count((Windows)TimeWindows.of((long)TimeUnit.DAYS.toMillis(2L)), "uwin-cnt").toStream().map(new SmokeTestUtil.Unwindow()).to(stringSerde, longSerde, "cnt");
        KTable cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
        cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"), new String[0]);
        maxTable.join(minTable, (ValueJoiner)new ValueJoiner<Integer, Integer, Integer>(){

            public Integer apply(Integer value1, Integer value2) {
                return value1 - value2;
            }
        }).to(stringSerde, intSerde, "dif");
        sumTable.join(cntTable, (ValueJoiner)new ValueJoiner<Long, Long, Double>(){

            public Double apply(Long value1, Long value2) {
                return (double)value1.longValue() / (double)value2.longValue();
            }
        }).to(stringSerde, doubleSerde, "avg");
        SmokeTestUtil.Agg agg = new SmokeTestUtil.Agg();
        cntTable.groupBy(agg.selector(), stringSerde, longSerde).aggregate(agg.init(), agg.adder(), agg.remover(), longSerde, "cntByCnt").to(stringSerde, longSerde, "tagg");
        return new KafkaStreams((TopologyBuilder)builder, props);
    }
}

