/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class SmokeTestClient
extends SmokeTestUtil {
    private final Properties streamsProperties;
    private Thread thread;
    private KafkaStreams streams;
    private boolean uncaughtException = false;

    public SmokeTestClient(Properties streamsProperties) {
        this.streamsProperties = streamsProperties;
    }

    public void start() {
        this.streams = SmokeTestClient.createKafkaStreams(this.streamsProperties);
        this.streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
                SmokeTestClient.this.uncaughtException = true;
                e.printStackTrace();
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                SmokeTestClient.this.close();
            }
        }));
        this.thread = new Thread(){

            @Override
            public void run() {
                SmokeTestClient.this.streams.start();
            }
        };
        this.thread.start();
    }

    public void close() {
        this.streams.close(Duration.ofSeconds(5L));
        if (!this.uncaughtException) {
            System.out.println("SMOKE-TEST-CLIENT-CLOSED");
        }
        try {
            this.thread.join();
        }
        catch (Exception ex) {
            System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
        }
    }

    private static Properties getStreamsConfig(Properties props) {
        Properties fullProps = new Properties(props);
        fullProps.put("application.id", "SmokeTest");
        fullProps.put("num.stream.threads", (Object)3);
        fullProps.put("num.standby.replicas", (Object)2);
        fullProps.put("buffered.records.per.partition", (Object)100);
        fullProps.put("commit.interval.ms", (Object)1000);
        fullProps.put("replication.factor", (Object)3);
        fullProps.put("auto.offset.reset", "earliest");
        fullProps.put("acks", "all");
        fullProps.putAll((Map<?, ?>)props);
        return fullProps;
    }

    private static KafkaStreams createKafkaStreams(Properties props) {
        StreamsBuilder builder = new StreamsBuilder();
        Consumed stringIntConsumed = Consumed.with((Serde)stringSerde, (Serde)intSerde);
        KStream source = builder.stream("data", stringIntConsumed);
        source.to("echo", Produced.with((Serde)stringSerde, (Serde)intSerde));
        KStream data = source.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value == null || value != Integer.MAX_VALUE;
            }
        });
        data.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        KGroupedStream groupedData = data.groupByKey(Serialized.with((Serde)stringSerde, (Serde)intSerde));
        groupedData.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofDays(1L))).aggregate((Initializer)new Initializer<Integer>(){

            public Integer apply() {
                return Integer.MAX_VALUE;
            }
        }, (Aggregator)new Aggregator<String, Integer, Integer>(){

            public Integer apply(String aggKey, Integer value, Integer aggregate) {
                return value < aggregate ? value : aggregate;
            }
        }, Materialized.as((String)"uwin-min").withValueSerde(intSerde)).toStream(new SmokeTestUtil.Unwindow()).to("min", Produced.with((Serde)stringSerde, (Serde)intSerde));
        KTable minTable = builder.table("min", Consumed.with((Serde)stringSerde, (Serde)intSerde), Materialized.as((String)"minStoreName"));
        minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"), new String[0]);
        groupedData.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofDays(2L))).aggregate((Initializer)new Initializer<Integer>(){

            public Integer apply() {
                return Integer.MIN_VALUE;
            }
        }, (Aggregator)new Aggregator<String, Integer, Integer>(){

            public Integer apply(String aggKey, Integer value, Integer aggregate) {
                return value > aggregate ? value : aggregate;
            }
        }, Materialized.as((String)"uwin-max").withValueSerde(intSerde)).toStream(new SmokeTestUtil.Unwindow()).to("max", Produced.with((Serde)stringSerde, (Serde)intSerde));
        KTable maxTable = builder.table("max", Consumed.with((Serde)stringSerde, (Serde)intSerde), Materialized.as((String)"maxStoreName"));
        maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"), new String[0]);
        groupedData.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofDays(2L))).aggregate((Initializer)new Initializer<Long>(){

            public Long apply() {
                return 0L;
            }
        }, (Aggregator)new Aggregator<String, Integer, Long>(){

            public Long apply(String aggKey, Integer value, Long aggregate) {
                return (long)value.intValue() + aggregate;
            }
        }, Materialized.as((String)"win-sum").withValueSerde(longSerde)).toStream(new SmokeTestUtil.Unwindow()).to("sum", Produced.with((Serde)stringSerde, (Serde)longSerde));
        Consumed stringLongConsumed = Consumed.with((Serde)stringSerde, (Serde)longSerde);
        KTable sumTable = builder.table("sum", stringLongConsumed);
        sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"), new String[0]);
        groupedData.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofDays(2L))).count(Materialized.as((String)"uwin-cnt")).toStream(new SmokeTestUtil.Unwindow()).to("cnt", Produced.with((Serde)stringSerde, (Serde)longSerde));
        KTable cntTable = builder.table("cnt", Consumed.with((Serde)stringSerde, (Serde)longSerde), Materialized.as((String)"cntStoreName"));
        cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"), new String[0]);
        maxTable.join(minTable, (ValueJoiner)new ValueJoiner<Integer, Integer, Integer>(){

            public Integer apply(Integer value1, Integer value2) {
                return value1 - value2;
            }
        }).toStream().to("dif", Produced.with((Serde)stringSerde, (Serde)intSerde));
        sumTable.join(cntTable, (ValueJoiner)new ValueJoiner<Long, Long, Double>(){

            public Double apply(Long value1, Long value2) {
                return (double)value1.longValue() / (double)value2.longValue();
            }
        }).toStream().to("avg", Produced.with((Serde)stringSerde, (Serde)doubleSerde));
        SmokeTestUtil.Agg agg = new SmokeTestUtil.Agg();
        cntTable.groupBy(agg.selector(), Serialized.with((Serde)stringSerde, (Serde)longSerde)).aggregate(agg.init(), agg.adder(), agg.remover(), Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"cntByCnt")).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())).toStream().to("tagg", Produced.with((Serde)stringSerde, (Serde)longSerde));
        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), SmokeTestClient.getStreamsConfig(props));
        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
                streamsClient.close(Duration.ofSeconds(30L));
            }
        });
        return streamsClient;
    }
}

