/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.KeyValueGroupedDataset;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

class GroupByKeyTranslatorBatch<@UnknownKeyFor K, @UnknownKeyFor V>
implements TransformTranslator<PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
    GroupByKeyTranslatorBatch() {
    }

    @Override
    public void translateTransform(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>>> transform, @UnknownKeyFor @NonNull @Initialized AbstractTranslationContext context) {
        PCollection inputPCollection = (PCollection)context.getInput();
        Dataset input = context.getDataset((PValue)inputPCollection);
        WindowingStrategy windowingStrategy = inputPCollection.getWindowingStrategy();
        KvCoder kvCoder = (KvCoder)inputPCollection.getCoder();
        Coder valueCoder = kvCoder.getValueCoder();
        Coder keyCoder = kvCoder.getKeyCoder();
        KeyValueGroupedDataset groupByKeyOnly = input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder));
        WindowedValue.FullWindowedValueCoder outputCoder = WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of((Coder)keyCoder, (Coder)IterableCoder.of((Coder)valueCoder)), (Coder)windowingStrategy.getWindowFn().windowCoder());
        Dataset output = groupByKeyOnly.flatMapGroups(new GroupAlsoByWindowViaOutputBufferFn(windowingStrategy, new InMemoryStateInternalsFactory(), SystemReduceFn.buffering((Coder)valueCoder), context.getSerializableOptions()), EncoderHelpers.fromBeamCoder(outputCoder));
        context.putDataset(context.getOutput(), output);
    }

    static class InMemoryStateInternalsFactory<@UnknownKeyFor K>
    implements StateInternalsFactory<K>,
    Serializable {
        InMemoryStateInternalsFactory() {
        }

        public @UnknownKeyFor @NonNull @Initialized StateInternals stateInternalsForKey(K key) {
            return InMemoryStateInternals.forKey(key);
        }
    }
}

