/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.Collections;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics;
import org.apache.beam.runners.samza.runtime.KeyedInternals;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OutputManagerFactory;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupByKeyOp<K, InputT, OutputT>
implements Op<KeyedWorkItem<K, InputT>, KV<K, OutputT>, K> {
    private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyOp.class);
    private static final String TIMER_STATE_ID = "timer";
    private final TupleTag<KV<K, OutputT>> mainOutputTag;
    private final KeyedWorkItemCoder<K, InputT> inputCoder;
    private final WindowingStrategy<?, BoundedWindow> windowingStrategy;
    private final OutputManagerFactory<KV<K, OutputT>> outputManagerFactory;
    private final Coder<K> keyCoder;
    private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn;
    private final String stepName;
    private final String stepId;
    private final PCollection.IsBounded isBounded;
    private transient StateInternalsFactory<K> stateInternalsFactory;
    private transient SamzaTimerInternalsFactory<K> timerInternalsFactory;
    private transient DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> fnRunner;
    private transient SamzaPipelineOptions pipelineOptions;

    public GroupByKeyOp(TupleTag<KV<K, OutputT>> mainOutputTag, Coder<KeyedWorkItem<K, InputT>> inputCoder, SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn, WindowingStrategy<?, BoundedWindow> windowingStrategy, OutputManagerFactory<KV<K, OutputT>> outputManagerFactory, String stepName, String stepId, PCollection.IsBounded isBounded) {
        this.mainOutputTag = mainOutputTag;
        this.windowingStrategy = windowingStrategy;
        this.outputManagerFactory = outputManagerFactory;
        this.stepName = stepName;
        this.stepId = stepId;
        this.isBounded = isBounded;
        if (!(inputCoder instanceof KeyedWorkItemCoder)) {
            throw new IllegalArgumentException(String.format("GroupByKeyOp requires input to use KeyedWorkItemCoder. Got: %s", inputCoder.getClass()));
        }
        this.inputCoder = (KeyedWorkItemCoder)inputCoder;
        this.keyCoder = this.inputCoder.getKeyCoder();
        this.reduceFn = reduceFn;
    }

    @Override
    public void open(Config config, Context context, Scheduler<KeyedTimerData<K>> timerRegistry, OpEmitter<KV<K, OutputT>> emitter) {
        this.pipelineOptions = (SamzaPipelineOptions)((SerializablePipelineOptions)Base64Serializer.deserializeUnchecked((String)((String)config.get((Object)"beamPipelineOptions")), SerializablePipelineOptions.class)).get().as(SamzaPipelineOptions.class);
        SamzaStoreStateInternals.Factory nonKeyedStateInternalsFactory = SamzaStoreStateInternals.createStateInternalFactory(this.stepId, null, context.getTaskContext(), this.pipelineOptions, null);
        DoFnRunners.OutputManager outputManager = this.outputManagerFactory.create(emitter);
        this.stateInternalsFactory = new SamzaStoreStateInternals.Factory<K>(this.stepId, Collections.singletonMap("beamStore", SamzaStoreStateInternals.getBeamStore(context.getTaskContext())), this.keyCoder, this.pipelineOptions.getStoreBatchGetSize());
        this.timerInternalsFactory = SamzaTimerInternalsFactory.createTimerInternalFactory(this.keyCoder, timerRegistry, TIMER_STATE_ID, nonKeyedStateInternalsFactory, this.windowingStrategy, this.isBounded, this.pipelineOptions);
        DoFn doFn = GroupAlsoByWindowViaWindowSetNewDoFn.create(this.windowingStrategy, this.stateInternalsFactory, this.timerInternalsFactory, (SideInputReader)NullSideInputReader.of(Collections.emptyList()), this.reduceFn, (DoFnRunners.OutputManager)outputManager, this.mainOutputTag);
        final KeyedInternals<K> keyedInternals = new KeyedInternals<K>(this.stateInternalsFactory, this.timerInternalsFactory);
        StepContext stepContext = new StepContext(){

            public StateInternals stateInternals() {
                return keyedInternals.stateInternals();
            }

            public TimerInternals timerInternals() {
                return keyedInternals.timerInternals();
            }
        };
        DoFnRunner doFnRunner = DoFnRunners.simpleRunner((PipelineOptions)PipelineOptionsFactory.create(), (DoFn)doFn, (SideInputReader)NullSideInputReader.of(Collections.emptyList()), (DoFnRunners.OutputManager)outputManager, this.mainOutputTag, Collections.emptyList(), (StepContext)stepContext, null, Collections.emptyMap(), this.windowingStrategy, (DoFnSchemaInformation)DoFnSchemaInformation.create());
        SamzaExecutionContext executionContext = (SamzaExecutionContext)context.getApplicationContainerContext();
        this.fnRunner = DoFnRunnerWithMetrics.wrap(doFnRunner, executionContext.getMetricsContainer(), this.stepName);
    }

    @Override
    public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> inputElement, OpEmitter<KV<K, OutputT>> emitter) {
        this.fnRunner.startBundle();
        this.fnRunner.processElement(inputElement);
        this.fnRunner.finishBundle();
    }

    @Override
    public void processWatermark(Instant watermark, OpEmitter<KV<K, OutputT>> ctx) {
        this.timerInternalsFactory.setInputWatermark(watermark);
        this.fnRunner.startBundle();
        for (KeyedTimerData<K> keyedTimerData : this.timerInternalsFactory.removeReadyTimers()) {
            this.fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
        }
        this.fnRunner.finishBundle();
        if (this.timerInternalsFactory.getOutputWatermark() == null || this.timerInternalsFactory.getOutputWatermark().isBefore((ReadableInstant)watermark)) {
            this.timerInternalsFactory.setOutputWatermark(watermark);
            ctx.emitWatermark(this.timerInternalsFactory.getOutputWatermark());
        }
    }

    @Override
    public void processTimer(KeyedTimerData<K> keyedTimerData) {
        this.fnRunner.startBundle();
        this.fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
        this.fnRunner.finishBundle();
        this.timerInternalsFactory.removeProcessingTimer(keyedTimerData);
    }

    private void fireTimer(K key, TimerInternals.TimerData timer) {
        LOG.debug("Firing timer {} for key {}", (Object)timer, key);
        this.fnRunner.processElement(WindowedValue.valueInGlobalWindow((Object)KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timer))));
    }
}

