/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;

class GroupedStreamAggregateBuilder<K, V> {
    private final InternalStreamsBuilder builder;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final boolean repartitionRequired;
    private final String userName;
    private final Set<String> sourceNodes;
    private final String name;
    private final StreamsGraphNode streamsGraphNode;
    final Initializer<Long> countInitializer = () -> 0L;
    final Aggregator<K, V, Long> countAggregator = (aggKey, value, aggregate) -> aggregate + 1L;
    final Initializer<V> reduceInitializer = () -> null;

    GroupedStreamAggregateBuilder(InternalStreamsBuilder builder, GroupedInternal<K, V> groupedInternal, boolean repartitionRequired, Set<String> sourceNodes, String name, StreamsGraphNode streamsGraphNode) {
        this.builder = builder;
        this.keySerde = groupedInternal.keySerde();
        this.valueSerde = groupedInternal.valueSerde();
        this.repartitionRequired = repartitionRequired;
        this.sourceNodes = sourceNodes;
        this.name = name;
        this.streamsGraphNode = streamsGraphNode;
        this.userName = groupedInternal.name();
    }

    <KR, VR> KTable<KR, VR> build(String functionName, StoreBuilder<? extends StateStore> storeBuilder, KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier, boolean isQueryable, Serde<KR> keySerde, Serde<VR> valSerde) {
        String aggFunctionName = this.builder.newProcessorName(functionName);
        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
        String sourceName = this.repartitionIfRequired(this.userName != null ? this.userName : storeBuilder.name(), repartitionNodeBuilder);
        OptimizableRepartitionNode parentNode = this.streamsGraphNode;
        if (!sourceName.equals(this.name)) {
            OptimizableRepartitionNode repartitionNode = repartitionNodeBuilder.build();
            this.builder.addGraphNode(parentNode, repartitionNode);
            parentNode = repartitionNode;
        }
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode(aggFunctionName, new ProcessorParameters(aggregateSupplier, aggFunctionName), storeBuilder, this.repartitionRequired);
        this.builder.addGraphNode(parentNode, statefulProcessorNode);
        return new KTableImpl(aggFunctionName, keySerde, valSerde, sourceName.equals(this.name) ? this.sourceNodes : Collections.singleton(sourceName), storeBuilder.name(), isQueryable, aggregateSupplier, statefulProcessorNode, this.builder);
    }

    private String repartitionIfRequired(String repartitionTopicNamePrefix, OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
        if (!this.repartitionRequired) {
            return this.name;
        }
        return KStreamImpl.createRepartitionedSource(this.builder, this.keySerde, this.valueSerde, repartitionTopicNamePrefix, optimizableRepartitionNodeBuilder);
    }
}

