/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.io.Serializable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.samza.runtime.DoFnOp;
import org.apache.beam.runners.samza.runtime.GroupByKeyOp;
import org.apache.beam.runners.samza.runtime.KvToKeyedWorkItemOp;
import org.apache.beam.runners.samza.runtime.OpAdapter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.transforms.GroupWithoutRepartition;
import org.apache.beam.runners.samza.translation.PortableTranslationContext;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.SamzaCoders;
import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

class GroupByKeyTranslator<@UnknownKeyFor K, @UnknownKeyFor InputT, @UnknownKeyFor OutputT>
implements TransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
    GroupByKeyTranslator() {
    }

    @Override
    public void translate(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, OutputT>>> transform, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized TransformHierarchy. @UnknownKeyFor @NonNull @Initialized Node node, @UnknownKeyFor @NonNull @Initialized TranslationContext ctx) {
        GroupByKeyTranslator.doTranslate(transform, node, ctx);
    }

    private static <K, InputT, OutputT> void doTranslate(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, OutputT>>> transform, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized TransformHierarchy. @UnknownKeyFor @NonNull @Initialized Node node, @UnknownKeyFor @NonNull @Initialized TranslationContext ctx) {
        PCollection<KV<K, InputT>> input = ctx.getInput(transform);
        PCollection<KV<K, OutputT>> output = ctx.getOutput(transform);
        TupleTag<KV<K, OutputT>> outputTag = ctx.getOutputTag(transform);
        WindowingStrategy windowingStrategy = input.getWindowingStrategy();
        MessageStream inputStream = ctx.getMessageStream((PValue)input);
        KvCoder kvInputCoder = (KvCoder)input.getCoder();
        Coder<WindowedValue<KV<K, InputT>>> elementCoder = SamzaCoders.of(input);
        SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = GroupByKeyTranslator.getSystemReduceFn(transform, input.getPipeline(), kvInputCoder);
        MessageStream<OpMessage<KV<K, OutputT>>> outputStream = GroupByKeyTranslator.doTranslateGBK(inputStream, GroupByKeyTranslator.needRepartition(node, ctx), reduceFn, windowingStrategy, kvInputCoder, elementCoder, ctx.getTransformFullName(), ctx.getTransformId(), outputTag, input.isBounded());
        ctx.registerMessageStream((PValue)output, outputStream);
    }

    @Override
    public void translatePortable(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PipelineNode.PTransformNode transform, @UnknownKeyFor @NonNull @Initialized QueryablePipeline pipeline, @UnknownKeyFor @NonNull @Initialized PortableTranslationContext ctx) {
        GroupByKeyTranslator.doTranslatePortable(transform, pipeline, ctx);
    }

    private static <K, InputT, OutputT> void doTranslatePortable(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PipelineNode.PTransformNode transform, @UnknownKeyFor @NonNull @Initialized QueryablePipeline pipeline, @UnknownKeyFor @NonNull @Initialized PortableTranslationContext ctx) {
        MessageStream inputStream = ctx.getOneInputMessageStream(transform);
        boolean needRepartition = ctx.getSamzaPipelineOptions().getMaxSourceParallelism() > 1;
        WindowingStrategy<?, BoundedWindow> windowingStrategy = ctx.getPortableWindowStrategy(transform, pipeline);
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        String inputId = ctx.getInputId(transform);
        WindowedValue.WindowedValueCoder windowedInputCoder = ctx.instantiateCoder(inputId, pipeline.getComponents());
        KvCoder kvInputCoder = (KvCoder)windowedInputCoder.getValueCoder();
        WindowedValue.FullWindowedValueCoder elementCoder = WindowedValue.FullWindowedValueCoder.of((Coder)kvInputCoder, (Coder)windowCoder);
        TupleTag outputTag = new TupleTag((String)Iterables.getOnlyElement(transform.getTransform().getOutputsMap().keySet()));
        SystemReduceFn reduceFn = SystemReduceFn.buffering((Coder)kvInputCoder.getValueCoder());
        RunnerApi.PCollection input = pipeline.getComponents().getPcollectionsOrThrow(inputId);
        PCollection.IsBounded isBounded = SamzaPipelineTranslatorUtils.isBounded(input);
        MessageStream<OpMessage<KV<K, OutputT>>> outputStream = GroupByKeyTranslator.doTranslateGBK(inputStream, needRepartition, reduceFn, windowingStrategy, kvInputCoder, elementCoder, ctx.getTransformFullName(), ctx.getTransformId(), outputTag, isBounded);
        ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
    }

    private static <K, InputT, OutputT> @UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @NonNull @Initialized OpMessage<@UnknownKeyFor @NonNull @Initialized KV<K, OutputT>>> doTranslateGBK(@UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @NonNull @Initialized OpMessage<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>> inputStream, @UnknownKeyFor @NonNull @Initialized boolean needRepartition, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized SystemReduceFn<K, InputT, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, OutputT, @UnknownKeyFor @NonNull @Initialized BoundedWindow> reduceFn, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized BoundedWindow> windowingStrategy, @UnknownKeyFor @NonNull @Initialized KvCoder<K, InputT> kvInputCoder, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>> elementCoder, @UnknownKeyFor @NonNull @Initialized String transformFullName, @UnknownKeyFor @NonNull @Initialized String transformId, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<K, OutputT>> outputTag, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized PCollection.IsBounded isBounded) {
        MessageStream filteredInputStream = inputStream.filter((FilterFunction & Serializable)msg -> msg.getType() == OpMessage.Type.ELEMENT);
        MessageStream partitionedInputStream = !needRepartition ? filteredInputStream : filteredInputStream.partitionBy((MapFunction & Serializable)msg -> ((KV)msg.getElement().getValue()).getKey(), (MapFunction & Serializable)msg -> msg.getElement(), KVSerde.of(SamzaCoders.toSerde(kvInputCoder.getKeyCoder()), SamzaCoders.toSerde(elementCoder)), "gbk-" + SamzaPipelineTranslatorUtils.escape(transformId)).map((MapFunction & Serializable)kv -> OpMessage.ofElement((WindowedValue)kv.getValue()));
        KeyedWorkItemCoder keyedWorkItemCoder = KeyedWorkItemCoder.of((Coder)kvInputCoder.getKeyCoder(), (Coder)kvInputCoder.getValueCoder(), (Coder)windowingStrategy.getWindowFn().windowCoder());
        MessageStream outputStream = partitionedInputStream.flatMap(OpAdapter.adapt(new KvToKeyedWorkItemOp())).flatMap(OpAdapter.adapt(new GroupByKeyOp<K, InputT, OutputT>(outputTag, keyedWorkItemCoder, reduceFn, windowingStrategy, new DoFnOp.SingleOutputManagerFactory(), transformFullName, transformId, isBounded)));
        return outputStream;
    }

    private static <K, InputT, OutputT> /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized SystemReduceFn<K, InputT, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, OutputT, @UnknownKeyFor @NonNull @Initialized BoundedWindow> getSystemReduceFn(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, OutputT>>> transform, @UnknownKeyFor @NonNull @Initialized Pipeline pipeline, @UnknownKeyFor @NonNull @Initialized KvCoder<K, InputT> kvInputCoder) {
        if (transform instanceof GroupByKey) {
            return SystemReduceFn.buffering((Coder)kvInputCoder.getValueCoder());
        }
        if (transform instanceof Combine.PerKey) {
            CombineFnBase.GlobalCombineFn combineFn = ((Combine.PerKey)transform).getFn();
            return SystemReduceFn.combining((Coder)kvInputCoder.getKeyCoder(), (AppliedCombineFn)AppliedCombineFn.withInputCoder((CombineFnBase.GlobalCombineFn)combineFn, (CoderRegistry)pipeline.getCoderRegistry(), kvInputCoder));
        }
        throw new RuntimeException("Transform " + transform + " cannot be translated as GroupByKey.");
    }

    private static @UnknownKeyFor @NonNull @Initialized boolean needRepartition(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized TransformHierarchy. @UnknownKeyFor @NonNull @Initialized Node node, @UnknownKeyFor @NonNull @Initialized TranslationContext ctx) {
        if (ctx.getPipelineOptions().getMaxSourceParallelism() == 1) {
            return false;
        }
        if (node == null) {
            return true;
        }
        if (node.getTransform() instanceof GroupWithoutRepartition) {
            return false;
        }
        return GroupByKeyTranslator.needRepartition(node.getEnclosingNode(), ctx);
    }
}

