/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.tests.SmokeTestUtil;
import org.apache.kafka.test.TestUtils;

public class SmokeTestClient
extends SmokeTestUtil {
    private final String name;
    private Thread thread;
    private KafkaStreams streams;
    private boolean uncaughtException = false;
    private boolean started;
    private boolean closed;

    public SmokeTestClient(String name) {
        this.name = name;
    }

    public boolean started() {
        return this.started;
    }

    public boolean closed() {
        return this.closed;
    }

    public void start(Properties streamsProperties) {
        this.streams = this.createKafkaStreams(streamsProperties);
        this.streams.setUncaughtExceptionHandler((t, e) -> {
            System.out.println(this.name + ": SMOKE-TEST-CLIENT-EXCEPTION");
            this.uncaughtException = true;
            e.printStackTrace();
        });
        Exit.addShutdownHook((String)"streams-shutdown-hook", () -> this.close());
        this.thread = new Thread(() -> this.streams.start());
        this.thread.start();
    }

    public void closeAsync() {
        this.streams.close(Duration.ZERO);
    }

    public void close() {
        this.streams.close(Duration.ofSeconds(5L));
        if (!this.uncaughtException) {
            System.out.println(this.name + ": SMOKE-TEST-CLIENT-CLOSED");
        }
        try {
            this.thread.join();
        }
        catch (Exception ex) {
            System.out.println(this.name + ": SMOKE-TEST-CLIENT-EXCEPTION");
        }
    }

    private Properties getStreamsConfig(Properties props) {
        Properties fullProps = new Properties(props);
        fullProps.put("application.id", "SmokeTest");
        fullProps.put("client.id", "SmokeTest-" + this.name);
        fullProps.put("num.stream.threads", (Object)3);
        fullProps.put("num.standby.replicas", (Object)2);
        fullProps.put("buffered.records.per.partition", (Object)100);
        fullProps.put("commit.interval.ms", (Object)1000);
        fullProps.put("replication.factor", (Object)3);
        fullProps.put("auto.offset.reset", "earliest");
        fullProps.put("acks", "all");
        fullProps.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        fullProps.putAll((Map<?, ?>)props);
        return fullProps;
    }

    private KafkaStreams createKafkaStreams(Properties props) {
        Topology build = this.getTopology();
        KafkaStreams streamsClient = new KafkaStreams(build, this.getStreamsConfig(props));
        streamsClient.setStateListener((newState, oldState) -> {
            System.out.printf("%s %s: %s -> %s%n", this.name, Instant.now(), oldState, newState);
            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
                this.started = true;
            }
            if (newState == KafkaStreams.State.NOT_RUNNING) {
                this.closed = true;
            }
        });
        streamsClient.setUncaughtExceptionHandler((t, e) -> {
            System.out.println(this.name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
            streamsClient.close(Duration.ofSeconds(30L));
        });
        return streamsClient;
    }

    public Topology getTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        Consumed stringIntConsumed = Consumed.with((Serde)stringSerde, (Serde)intSerde);
        KStream source = builder.stream("data", stringIntConsumed);
        source.filterNot((k, v) -> k.equals("flush")).to("echo", Produced.with((Serde)stringSerde, (Serde)intSerde));
        KStream data = source.filter((key, value) -> value == null || value != Integer.MAX_VALUE);
        data.process(SmokeTestUtil.printProcessorSupplier("data", this.name), new String[0]);
        KGroupedStream groupedData = data.groupByKey(Grouped.with((Serde)stringSerde, (Serde)intSerde));
        KTable minAggregation = groupedData.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofDays(1L)).grace(Duration.ofMinutes(1L))).aggregate(() -> Integer.MAX_VALUE, (aggKey, value, aggregate) -> value < aggregate ? value : aggregate, Materialized.as((String)"uwin-min").withValueSerde(intSerde).withRetention(Duration.ofHours(25L)));
        SmokeTestClient.streamify((KTable<Windowed<String>, Integer>)minAggregation, "min-raw");
        SmokeTestClient.streamify((KTable<Windowed<String>, Integer>)minAggregation.suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())), "min-suppressed");
        minAggregation.toStream(new SmokeTestUtil.Unwindow()).filterNot((k, v) -> k.equals("flush")).to("min", Produced.with((Serde)stringSerde, (Serde)intSerde));
        KTable smallWindowSum = groupedData.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofSeconds(2L)).advanceBy(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(30L))).reduce((l, r) -> l + r);
        SmokeTestClient.streamify((KTable<Windowed<String>, Integer>)smallWindowSum, "sws-raw");
        SmokeTestClient.streamify((KTable<Windowed<String>, Integer>)smallWindowSum.suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())), "sws-suppressed");
        KTable minTable = builder.table("min", Consumed.with((Serde)stringSerde, (Serde)intSerde), Materialized.as((String)"minStoreName"));
        minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", this.name), new String[0]);
        groupedData.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofDays(2L))).aggregate(() -> Integer.MIN_VALUE, (aggKey, value, aggregate) -> value > aggregate ? value : aggregate, Materialized.as((String)"uwin-max").withValueSerde(intSerde)).toStream(new SmokeTestUtil.Unwindow()).filterNot((k, v) -> k.equals("flush")).to("max", Produced.with((Serde)stringSerde, (Serde)intSerde));
        KTable maxTable = builder.table("max", Consumed.with((Serde)stringSerde, (Serde)intSerde), Materialized.as((String)"maxStoreName"));
        maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", this.name), new String[0]);
        groupedData.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofDays(2L))).aggregate(() -> 0L, (aggKey, value, aggregate) -> (long)value.intValue() + aggregate, Materialized.as((String)"win-sum").withValueSerde(longSerde)).toStream(new SmokeTestUtil.Unwindow()).filterNot((k, v) -> k.equals("flush")).to("sum", Produced.with((Serde)stringSerde, (Serde)longSerde));
        Consumed stringLongConsumed = Consumed.with((Serde)stringSerde, (Serde)longSerde);
        KTable sumTable = builder.table("sum", stringLongConsumed);
        sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", this.name), new String[0]);
        groupedData.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofDays(2L))).count(Materialized.as((String)"uwin-cnt")).toStream(new SmokeTestUtil.Unwindow()).filterNot((k, v) -> k.equals("flush")).to("cnt", Produced.with((Serde)stringSerde, (Serde)longSerde));
        KTable cntTable = builder.table("cnt", Consumed.with((Serde)stringSerde, (Serde)longSerde), Materialized.as((String)"cntStoreName"));
        cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", this.name), new String[0]);
        maxTable.join(minTable, (value1, value2) -> value1 - value2).toStream().filterNot((k, v) -> k.equals("flush")).to("dif", Produced.with((Serde)stringSerde, (Serde)intSerde));
        sumTable.join(cntTable, (value1, value2) -> (double)value1.longValue() / (double)value2.longValue()).toStream().filterNot((k, v) -> k.equals("flush")).to("avg", Produced.with((Serde)stringSerde, (Serde)doubleSerde));
        SmokeTestUtil.Agg agg = new SmokeTestUtil.Agg();
        cntTable.groupBy(agg.selector(), Grouped.with((Serde)stringSerde, (Serde)longSerde)).aggregate(agg.init(), agg.adder(), agg.remover(), Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"cntByCnt")).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())).toStream().to("tagg", Produced.with((Serde)stringSerde, (Serde)longSerde));
        return builder.build();
    }

    private static void streamify(KTable<Windowed<String>, Integer> windowedTable, String topic) {
        windowedTable.toStream().filterNot((k, v) -> ((String)k.key()).equals("flush")).map((key, value) -> new KeyValue((Object)key.toString(), value)).to(topic, Produced.with((Serde)stringSerde, (Serde)intSerde));
    }
}

