/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.StreamJoined;

public class StreamsOptimizedTest {
    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("StreamsOptimizedTest requires one argument (properties-file) but no provided: ");
        }
        String propFileName = args[0];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        System.out.println("StreamsTest instance started StreamsOptimizedTest");
        System.out.println("props=" + streamsProperties);
        String inputTopic = (String)Objects.requireNonNull(streamsProperties.remove("input.topic"));
        String aggregationTopic = (String)Objects.requireNonNull(streamsProperties.remove("aggregation.topic"));
        String reduceTopic = (String)Objects.requireNonNull(streamsProperties.remove("reduce.topic"));
        String joinTopic = (String)Objects.requireNonNull(streamsProperties.remove("join.topic"));
        Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
        Initializer initializer = () -> 0;
        Aggregator aggregator = (k, v, agg) -> agg + v.length();
        Reducer reducer = (v1, v2) -> Integer.toString(Integer.parseInt(v1) + Integer.parseInt(v2));
        Function<String, String> keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9);
        StreamsBuilder builder = new StreamsBuilder();
        KStream sourceStream = builder.stream(inputTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream mappedStream = sourceStream.selectKey((k, v) -> (String)keyFunction.apply((String)v));
        KStream countStream = mappedStream.groupByKey().count(Materialized.with((Serde)Serdes.String(), (Serde)Serdes.Long())).toStream();
        mappedStream.groupByKey().aggregate(initializer, aggregator, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.Integer())).toStream().peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v))).to(aggregationTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        mappedStream.groupByKey().reduce(reducer, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).toStream().peek((k, v) -> System.out.println(String.format("REDUCED key=%s value=%s", k, v))).to(reduceTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        mappedStream.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), JoinWindows.of((Duration)Duration.ofMillis(500L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.Long())).peek((k, v) -> System.out.println(String.format("JOINED key=%s value=%s", k, v))).to(joinTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        Properties config = new Properties();
        config.setProperty("application.id", "StreamsOptimizedTest");
        config.setProperty("cache.max.bytes.buffering", "0");
        config.setProperty("default.key.serde", Serdes.String().getClass().getName());
        config.setProperty("default.value.serde", Serdes.String().getClass().getName());
        config.setProperty(StreamsConfig.adminClientPrefix((String)"retries"), "100");
        config.putAll((Map<?, ?>)streamsProperties);
        Topology topology = builder.build(config);
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.setStateListener((newState, oldState) -> {
            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
                int repartitionTopicCount = StreamsOptimizedTest.getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern);
                System.out.println(String.format("REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%d", repartitionTopicCount));
                System.out.flush();
            }
        });
        streams.cleanUp();
        streams.start();
        Exit.addShutdownHook((String)"streams-shutdown-hook", () -> {
            System.out.println("closing Kafka Streams instance");
            System.out.flush();
            streams.close(Duration.ofMillis(5000L));
            System.out.println("OPTIMIZE_TEST Streams Stopped");
            System.out.flush();
        });
    }

    private static int getCountOfRepartitionTopicsFound(String topologyString, Pattern repartitionTopicPattern) {
        Matcher matcher = repartitionTopicPattern.matcher(topologyString);
        ArrayList<String> repartitionTopicsFound = new ArrayList<String>();
        while (matcher.find()) {
            String repartitionTopic = matcher.group();
            System.out.println(String.format("REPARTITION TOPIC found -> %s", repartitionTopic));
            repartitionTopicsFound.add(repartitionTopic);
        }
        return repartitionTopicsFound.size();
    }
}

