/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.DoFnRunnerWithKeyedInternals;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OutputManagerFactory;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory;
import org.apache.beam.runners.samza.runtime.TimerKey;
import org.apache.beam.runners.samza.util.Base64Serializer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.Config;
import org.apache.samza.operators.TimerRegistry;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DoFnOp<InT, FnOutT, OutT>
implements Op<InT, OutT, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(DoFnOp.class);
    private final TupleTag<FnOutT> mainOutputTag;
    private final DoFn<InT, FnOutT> doFn;
    private final Coder<?> keyCoder;
    private final Collection<PCollectionView<?>> sideInputs;
    private final List<TupleTag<?>> sideOutputTags;
    private final WindowingStrategy windowingStrategy;
    private final OutputManagerFactory<OutT> outputManagerFactory;
    private final HashMap<String, PCollectionView<?>> idToViewMap;
    private final String stepName;
    private transient SamzaTimerInternalsFactory<?> timerInternalsFactory;
    private transient DoFnRunner<InT, FnOutT> fnRunner;
    private transient PushbackSideInputDoFnRunner<InT, FnOutT> pushbackFnRunner;
    private transient SideInputHandler sideInputHandler;
    private transient DoFnInvoker<InT, FnOutT> doFnInvoker;
    @SuppressWarnings(justification="No bug", value={"SE_TRANSIENT_FIELD_NOT_RESTORED"})
    private transient Instant pushbackWatermarkHold;
    private transient Instant inputWatermark;
    private transient Instant sideInputWatermark;
    private transient List<WindowedValue<InT>> pushbackValues;
    private transient DoFnSignature signature;
    private transient TaskContext context;
    private transient SamzaPipelineOptions pipelineOptions;

    public DoFnOp(TupleTag<FnOutT> mainOutputTag, DoFn<InT, FnOutT> doFn, Coder<?> keyCoder, Collection<PCollectionView<?>> sideInputs, List<TupleTag<?>> sideOutputTags, WindowingStrategy windowingStrategy, Map<String, PCollectionView<?>> idToViewMap, OutputManagerFactory<OutT> outputManagerFactory, String stepName) {
        this.mainOutputTag = mainOutputTag;
        this.doFn = doFn;
        this.sideInputs = sideInputs;
        this.sideOutputTags = sideOutputTags;
        this.windowingStrategy = windowingStrategy;
        this.idToViewMap = new HashMap(idToViewMap);
        this.outputManagerFactory = outputManagerFactory;
        this.stepName = stepName;
        this.keyCoder = keyCoder;
    }

    @Override
    public void open(Config config, TaskContext context, TimerRegistry<TimerKey<Void>> timerRegistry, OpEmitter<OutT> emitter) {
        this.pipelineOptions = (SamzaPipelineOptions)Base64Serializer.deserializeUnchecked((String)config.get((Object)"beamPipelineOptions"), SerializablePipelineOptions.class).get().as(SamzaPipelineOptions.class);
        this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
        this.timerInternalsFactory = this.createTimerInternalsFactory(this.keyCoder, timerRegistry);
        this.context = context;
        this.signature = DoFnSignatures.getSignature(this.doFn.getClass());
        SamzaStoreStateInternals.Factory nonKeyedStateInternalsFactory = this.createStateInternalFactory(null);
        this.sideInputHandler = new SideInputHandler(this.sideInputs, nonKeyedStateInternalsFactory.stateInternalsForKey(null));
        SamzaExecutionContext executionContext = (SamzaExecutionContext)context.getUserContext();
        SamzaStoreStateInternals.Factory stateInternalsFactory = this.createStateInternalFactory(this.keyCoder);
        this.fnRunner = DoFnRunnerWithKeyedInternals.of(this.pipelineOptions, this.doFn, (SideInputReader)this.sideInputHandler, this.outputManagerFactory.create(emitter), this.mainOutputTag, this.sideOutputTags, stateInternalsFactory, this.timerInternalsFactory, this.windowingStrategy, executionContext.getMetricsContainer(), this.stepName);
        this.pushbackFnRunner = SimplePushbackSideInputDoFnRunner.create(this.fnRunner, this.sideInputs, (ReadyCheckingSideInputReader)this.sideInputHandler);
        this.pushbackValues = new ArrayList<WindowedValue<InT>>();
        this.doFnInvoker = DoFnInvokers.invokerFor(this.doFn);
        this.doFnInvoker.invokeSetup();
    }

    @Override
    public void processElement(WindowedValue<InT> inputElement, OpEmitter<OutT> emitter) {
        this.pushbackFnRunner.startBundle();
        Iterable rejectedValues = this.pushbackFnRunner.processElementInReadyWindows(inputElement);
        for (WindowedValue rejectedValue : rejectedValues) {
            if (rejectedValue.getTimestamp().compareTo((ReadableInstant)this.pushbackWatermarkHold) < 0) {
                this.pushbackWatermarkHold = rejectedValue.getTimestamp();
            }
            this.pushbackValues.add(rejectedValue);
        }
        this.pushbackFnRunner.finishBundle();
    }

    @Override
    public void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
        this.inputWatermark = watermark;
        if (this.sideInputWatermark.isEqual((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            this.emitAllPushbackValues();
        }
        Instant actualInputWatermark = this.pushbackWatermarkHold.isBefore((ReadableInstant)this.inputWatermark) ? this.pushbackWatermarkHold : this.inputWatermark;
        this.timerInternalsFactory.setInputWatermark(actualInputWatermark);
        this.pushbackFnRunner.startBundle();
        for (KeyedTimerData<?> keyedTimerData : this.timerInternalsFactory.removeReadyTimers()) {
            this.fireTimer(keyedTimerData);
        }
        this.pushbackFnRunner.finishBundle();
        if (this.timerInternalsFactory.getOutputWatermark() == null || this.timerInternalsFactory.getOutputWatermark().isBefore((ReadableInstant)actualInputWatermark)) {
            this.timerInternalsFactory.setOutputWatermark(actualInputWatermark);
            emitter.emitWatermark(this.timerInternalsFactory.getOutputWatermark());
        }
    }

    @Override
    public void processSideInput(String id, WindowedValue<? extends Iterable<?>> elements, OpEmitter<OutT> emitter) {
        WindowedValue<? extends Iterable<?>> retypedElements = elements;
        PCollectionView<?> view = this.idToViewMap.get(id);
        if (view == null) {
            throw new IllegalArgumentException("No mapping of id " + id + " to view.");
        }
        this.sideInputHandler.addSideInputValue(view, retypedElements);
        ArrayList<WindowedValue<InT>> previousPushbackValues = new ArrayList<WindowedValue<InT>>(this.pushbackValues);
        this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
        this.pushbackValues.clear();
        for (WindowedValue windowedValue : previousPushbackValues) {
            this.processElement(windowedValue, emitter);
        }
        this.processWatermark(this.inputWatermark, emitter);
    }

    @Override
    public void processSideInputWatermark(Instant watermark, OpEmitter<OutT> emitter) {
        this.sideInputWatermark = watermark;
        if (this.sideInputWatermark.isEqual((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            this.processWatermark(this.inputWatermark, emitter);
        }
    }

    @Override
    public void processTimer(KeyedTimerData<Void> keyedTimerData) {
        this.pushbackFnRunner.startBundle();
        this.fireTimer(keyedTimerData);
        this.pushbackFnRunner.finishBundle();
    }

    @Override
    public void close() {
        this.doFnInvoker.invokeTeardown();
    }

    private SamzaStoreStateInternals.Factory createStateInternalFactory(Coder<?> keyCoder) {
        Coder<?> stateKeyCoder;
        int batchGetSize = this.pipelineOptions.getStoreBatchGetSize();
        HashMap<String, KeyValueStore<byte[], byte[]>> stores = new HashMap<String, KeyValueStore<byte[], byte[]>>(SamzaStoreStateInternals.getBeamStore(this.context));
        if (keyCoder != null) {
            this.signature.stateDeclarations().keySet().forEach(stateId -> stores.put((String)stateId, (KeyValueStore<byte[], byte[]>)((KeyValueStore)this.context.getStore(stateId))));
            stateKeyCoder = keyCoder;
        } else {
            stateKeyCoder = VoidCoder.of();
        }
        return new SamzaStoreStateInternals.Factory(this.mainOutputTag.getId(), stores, stateKeyCoder, batchGetSize);
    }

    private SamzaTimerInternalsFactory createTimerInternalsFactory(Coder<?> keyCoder, TimerRegistry timerRegistry) {
        return new SamzaTimerInternalsFactory(keyCoder, timerRegistry);
    }

    private void fireTimer(KeyedTimerData<?> keyedTimerData) {
        TimerInternals.TimerData timer = keyedTimerData.getTimerData();
        LOG.debug("Firing timer {}", (Object)timer);
        StateNamespace namespace = timer.getNamespace();
        BoundedWindow window = ((StateNamespaces.WindowNamespace)namespace).getWindow();
        if (this.fnRunner instanceof DoFnRunnerWithKeyedInternals) {
            ((DoFnRunnerWithKeyedInternals)this.fnRunner).onTimer(keyedTimerData, window);
        } else {
            this.pushbackFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
        }
    }

    private void emitAllPushbackValues() {
        if (!this.pushbackValues.isEmpty()) {
            this.pushbackFnRunner.startBundle();
            ArrayList<WindowedValue<InT>> previousPushbackValues = new ArrayList<WindowedValue<InT>>(this.pushbackValues);
            this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
            this.pushbackValues.clear();
            for (WindowedValue windowedValue : previousPushbackValues) {
                this.fnRunner.processElement(windowedValue);
            }
            this.pushbackFnRunner.finishBundle();
        }
    }

    public static class MultiOutputManagerFactory
    implements OutputManagerFactory<RawUnionValue> {
        private final Map<TupleTag<?>, Integer> tagToIdMap;

        public MultiOutputManagerFactory(Map<TupleTag<?>, Integer> tagToIdMap) {
            this.tagToIdMap = tagToIdMap;
        }

        @Override
        public DoFnRunners.OutputManager create(final OpEmitter<RawUnionValue> emitter) {
            return new DoFnRunners.OutputManager(){

                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    int id = (Integer)tagToIdMap.get(tupleTag);
                    Object rawValue = windowedValue.getValue();
                    RawUnionValue rawUnionValue = new RawUnionValue(id, rawValue);
                    emitter.emitElement(windowedValue.withValue((Object)rawUnionValue));
                }
            };
        }
    }

    public static class SingleOutputManagerFactory<OutT>
    implements OutputManagerFactory<OutT> {
        @Override
        public DoFnRunners.OutputManager create(final OpEmitter<OutT> emitter) {
            return new DoFnRunners.OutputManager(){

                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    WindowedValue<T> retypedWindowedValue = windowedValue;
                    emitter.emitElement(retypedWindowedValue);
                }
            };
        }
    }
}

