/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.PassThrough;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.StoreBuilder;

class CogroupedStreamAggregateBuilder<K, VOut> {
    private final InternalStreamsBuilder builder;
    private final Map<KGroupedStreamImpl<K, ?>, StreamsGraphNode> parentNodes = new LinkedHashMap();

    CogroupedStreamAggregateBuilder(InternalStreamsBuilder builder) {
        this.builder = builder;
    }

    <KR, VIn, W extends Window> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, Initializer<VOut> initializer, NamedInternal named, StoreBuilder<?> storeBuilder, Serde<KR> keySerde, Serde<VOut> valSerde, String queryableName, Windows<W> windows, SessionWindows sessionWindows, Merger<? super K, VOut> sessionMerger) {
        for (KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {
            if (repartitionReqs.repartitionRequired) {
                OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
                String repartionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ? repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name();
                this.createRepartitionSource(repartionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valSerde);
                if (this.parentNodes.containsKey(repartitionReqs)) continue;
                BaseRepartitionNode repartitionNode = repartitionNodeBuilder.build();
                this.builder.addGraphNode(repartitionReqs.streamsGraphNode, (StreamsGraphNode)repartitionNode);
                this.parentNodes.put(repartitionReqs, repartitionNode);
                continue;
            }
            this.parentNodes.put(repartitionReqs, repartitionReqs.streamsGraphNode);
        }
        ArrayList groupedStreams = new ArrayList(this.parentNodes.keySet());
        AbstractStream kGrouped = (AbstractStream)groupedStreams.iterator().next();
        groupedStreams.remove(kGrouped);
        kGrouped.ensureCopartitionWith(groupedStreams);
        ArrayList<StreamsGraphNode> processors = new ArrayList<StreamsGraphNode>();
        boolean stateCreated = false;
        int counter = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
            StatefulProcessorNode<? super K, ?> statefulProcessorNode = this.getStatefulProcessorNode(kGroupedStream.getValue(), initializer, named.suffixWithOrElseGet("-cogroup-agg-" + counter++, this.builder, "COGROUPKSTREAM-AGGREGATE-"), stateCreated, storeBuilder, windows, sessionWindows, sessionMerger);
            stateCreated = true;
            processors.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
        }
        String mergeProcessorName = named.suffixWithOrElseGet("-cogroup-merge", this.builder, "COGROUPKSTREAM-MERGE-");
        PassThrough passThrough = new PassThrough();
        ProcessorGraphNode mergeNode = new ProcessorGraphNode(mergeProcessorName, new ProcessorParameters(passThrough, mergeProcessorName));
        this.builder.addGraphNode(processors, mergeNode);
        return new KTableImpl(mergeProcessorName, keySerde, valSerde, Collections.singleton(mergeNode.nodeName()), queryableName, passThrough, mergeNode, this.builder);
    }

    private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(Aggregator<? super K, Object, VOut> aggregator, Initializer<VOut> initializer, String processorName, boolean stateCreated, StoreBuilder<?> storeBuilder, Windows<W> windows, SessionWindows sessionWindows, Merger<? super K, VOut> sessionMerger) {
        KStreamAggProcessorSupplier<Object, Object, Object, Object> kStreamAggregate;
        if (windows == null && sessionWindows == null) {
            kStreamAggregate = new KStreamAggregate<K, Object, VOut>(storeBuilder.name(), initializer, aggregator);
        } else if (windows != null && sessionWindows == null) {
            kStreamAggregate = new KStreamWindowAggregate<K, Object, VOut, W>(windows, storeBuilder.name(), initializer, aggregator);
        } else if (windows == null && sessionMerger != null) {
            kStreamAggregate = new KStreamSessionWindowAggregate<K, Object, VOut>(sessionWindows, storeBuilder.name(), initializer, aggregator, sessionMerger);
        } else {
            throw new IllegalArgumentException("must include windows OR sessionWindows + sessionMerger OR all must be null");
        }
        StatefulProcessorNode statefulProcessorNode = !stateCreated ? new StatefulProcessorNode(processorName, new ProcessorParameters(kStreamAggregate, processorName), storeBuilder) : new StatefulProcessorNode(processorName, new ProcessorParameters(kStreamAggregate, processorName), new String[]{storeBuilder.name()});
        return statefulProcessorNode;
    }

    private <VIn> void createRepartitionSource(String repartitionTopicNamePrefix, OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder, Serde<K> keySerde, Serde<?> valueSerde) {
        KStreamImpl.createRepartitionedSource(this.builder, keySerde, valueSerde, repartitionTopicNamePrefix, null, optimizableRepartitionNodeBuilder);
    }
}

