/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.BundleManager;
import org.apache.beam.runners.samza.runtime.ClassicBundleManager;
import org.apache.beam.runners.samza.runtime.FutureCollector;
import org.apache.beam.runners.samza.runtime.FutureCollectorImpl;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OutputManagerFactory;
import org.apache.beam.runners.samza.runtime.SamzaDoFnInvokerRegistrar;
import org.apache.beam.runners.samza.runtime.SamzaDoFnRunners;
import org.apache.beam.runners.samza.runtime.SamzaExecutableStageContextFactory;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory;
import org.apache.beam.runners.samza.util.DoFnUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DoFnOp<@UnknownKeyFor InT, @UnknownKeyFor FnOutT, @UnknownKeyFor OutT>
implements Op<InT, OutT, Void> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(DoFnOp.class);
    private final @UnknownKeyFor @NonNull @Initialized TupleTag<FnOutT> mainOutputTag;
    private final @UnknownKeyFor @NonNull @Initialized DoFn<InT, FnOutT> doFn;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> keyCoder;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputs;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideOutputTags;
    private final @UnknownKeyFor @NonNull @Initialized WindowingStrategy windowingStrategy;
    private final @UnknownKeyFor @NonNull @Initialized OutputManagerFactory<OutT> outputManagerFactory;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized HashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> idToViewMap;
    private final @UnknownKeyFor @NonNull @Initialized String transformFullName;
    private final @UnknownKeyFor @NonNull @Initialized String transformId;
    private final @UnknownKeyFor @NonNull @Initialized Coder<InT> inputCoder;
    private final @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized WindowedValue<InT>> windowedValueCoder;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized HashMap<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputCoders;
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PCollection.IsBounded isBounded;
    private final @UnknownKeyFor @NonNull @Initialized String bundleCheckTimerId;
    private final @UnknownKeyFor @NonNull @Initialized String bundleStateId;
    private final @UnknownKeyFor @NonNull @Initialized boolean isPortable;
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.ExecutableStagePayload stagePayload;
    private final @UnknownKeyFor @NonNull @Initialized JobInfo jobInfo;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized HashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> idToTupleTagMap;
    private transient /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized SamzaTimerInternalsFactory<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> timerInternalsFactory;
    private transient @UnknownKeyFor @NonNull @Initialized DoFnRunner<InT, FnOutT> fnRunner;
    private transient @UnknownKeyFor @NonNull @Initialized PushbackSideInputDoFnRunner<InT, FnOutT> pushbackFnRunner;
    private transient @UnknownKeyFor @NonNull @Initialized SideInputHandler sideInputHandler;
    private transient @UnknownKeyFor @NonNull @Initialized DoFnInvoker<InT, FnOutT> doFnInvoker;
    private transient @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions samzaPipelineOptions;
    @SuppressWarnings(justification="No bug", value={"SE_TRANSIENT_FIELD_NOT_RESTORED"})
    private transient @UnknownKeyFor @NonNull @Initialized Instant pushbackWatermarkHold;
    private transient @UnknownKeyFor @NonNull @Initialized Instant inputWatermark;
    private transient @UnknownKeyFor @NonNull @Initialized BundleManager<OutT> bundleManager;
    private transient @UnknownKeyFor @NonNull @Initialized Instant sideInputWatermark;
    private transient @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized WindowedValue<InT>> pushbackValues;
    private transient @UnknownKeyFor @NonNull @Initialized ExecutableStageContext stageContext;
    private transient @UnknownKeyFor @NonNull @Initialized StageBundleFactory stageBundleFactory;
    private transient @UnknownKeyFor @NonNull @Initialized boolean bundleDisabled;
    private final @UnknownKeyFor @NonNull @Initialized DoFnSchemaInformation doFnSchemaInformation;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputMapping;
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> stateIdToStoreMapping;

    public DoFnOp(@UnknownKeyFor @NonNull @Initialized TupleTag<FnOutT> mainOutputTag, @UnknownKeyFor @NonNull @Initialized DoFn<InT, FnOutT> doFn, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> keyCoder, @UnknownKeyFor @NonNull @Initialized Coder<InT> inputCoder, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized WindowedValue<InT>> windowedValueCoder, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputCoders, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputs, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideOutputTags, @UnknownKeyFor @NonNull @Initialized WindowingStrategy windowingStrategy, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> idToViewMap, @UnknownKeyFor @NonNull @Initialized OutputManagerFactory<OutT> outputManagerFactory, @UnknownKeyFor @NonNull @Initialized String transformFullName, @UnknownKeyFor @NonNull @Initialized String transformId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PCollection.IsBounded isBounded, @UnknownKeyFor @NonNull @Initialized boolean isPortable, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.ExecutableStagePayload stagePayload, @UnknownKeyFor @NonNull @Initialized JobInfo jobInfo, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> idToTupleTagMap, @UnknownKeyFor @NonNull @Initialized DoFnSchemaInformation doFnSchemaInformation, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputMapping, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> stateIdToStoreMapping) {
        this.mainOutputTag = mainOutputTag;
        this.doFn = doFn;
        this.sideInputs = sideInputs;
        this.sideOutputTags = sideOutputTags;
        this.inputCoder = inputCoder;
        this.windowedValueCoder = windowedValueCoder;
        this.outputCoders = new HashMap(outputCoders);
        this.windowingStrategy = windowingStrategy;
        this.idToViewMap = new HashMap(idToViewMap);
        this.outputManagerFactory = outputManagerFactory;
        this.transformFullName = transformFullName;
        this.transformId = transformId;
        this.keyCoder = keyCoder;
        this.isBounded = isBounded;
        this.isPortable = isPortable;
        this.stagePayload = stagePayload;
        this.jobInfo = jobInfo;
        this.idToTupleTagMap = new HashMap(idToTupleTagMap);
        this.bundleCheckTimerId = "_samza_bundle_check_" + transformId;
        this.bundleStateId = "_samza_bundle_" + transformId;
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.sideInputMapping = sideInputMapping;
        this.stateIdToStoreMapping = stateIdToStoreMapping;
    }

    @Override
    public void open(@UnknownKeyFor @NonNull @Initialized Config config, @UnknownKeyFor @NonNull @Initialized Context context, @UnknownKeyFor @NonNull @Initialized Scheduler<@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @Nullable @Initialized Void>> timerRegistry, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
        DoFnSignature signature = DoFnSignatures.getSignature(this.doFn.getClass());
        SamzaExecutionContext samzaExecutionContext = (SamzaExecutionContext)context.getApplicationContainerContext();
        this.samzaPipelineOptions = samzaExecutionContext.getPipelineOptions();
        this.bundleDisabled = this.samzaPipelineOptions.getMaxBundleSize() <= 1L;
        String stateId = "pardo-" + this.transformId;
        SamzaStoreStateInternals.Factory<Object> nonKeyedStateInternalsFactory = SamzaStoreStateInternals.createNonKeyedStateInternalsFactory(stateId, context.getTaskContext(), this.samzaPipelineOptions);
        FutureCollector<OutT> outputFutureCollector = this.createFutureCollector();
        this.bundleManager = new ClassicBundleManager<OutT>(this.createBundleProgressListener(), outputFutureCollector, this.samzaPipelineOptions.getMaxBundleSize(), this.samzaPipelineOptions.getMaxBundleTimeMs(), timerRegistry, this.bundleCheckTimerId);
        this.timerInternalsFactory = SamzaTimerInternalsFactory.createTimerInternalFactory(this.keyCoder, timerRegistry, this.getTimerStateId(signature), nonKeyedStateInternalsFactory, this.windowingStrategy, this.isBounded, this.samzaPipelineOptions);
        this.sideInputHandler = new SideInputHandler(this.sideInputs, nonKeyedStateInternalsFactory.stateInternalsForKey(null));
        if (this.isPortable) {
            ExecutableStage executableStage = ExecutableStage.fromPayload((RunnerApi.ExecutableStagePayload)this.stagePayload);
            this.stageContext = SamzaExecutableStageContextFactory.getInstance().get(this.jobInfo);
            this.stageBundleFactory = this.stageContext.getStageBundleFactory(executableStage);
            this.fnRunner = SamzaDoFnRunners.createPortable(this.transformId, DoFnUtils.toStepName(executableStage), this.bundleStateId, this.windowedValueCoder, executableStage, this.sideInputMapping, this.sideInputHandler, nonKeyedStateInternalsFactory, this.timerInternalsFactory, this.samzaPipelineOptions, this.outputManagerFactory.create(emitter, outputFutureCollector), this.stageBundleFactory, samzaExecutionContext, this.mainOutputTag, this.idToTupleTagMap, context, this.transformFullName);
        } else {
            this.fnRunner = SamzaDoFnRunners.create(this.samzaPipelineOptions, this.doFn, this.windowingStrategy, this.transformFullName, stateId, context, this.mainOutputTag, this.sideInputHandler, this.timerInternalsFactory, this.keyCoder, this.outputManagerFactory.create(emitter, outputFutureCollector), this.inputCoder, this.sideOutputTags, this.outputCoders, this.doFnSchemaInformation, this.sideInputMapping, this.stateIdToStoreMapping, emitter, outputFutureCollector);
        }
        this.pushbackFnRunner = SimplePushbackSideInputDoFnRunner.create(this.fnRunner, this.sideInputs, (ReadyCheckingSideInputReader)this.sideInputHandler);
        this.pushbackValues = new ArrayList<WindowedValue<InT>>();
        Iterator<SamzaDoFnInvokerRegistrar> invokerReg = ServiceLoader.load(SamzaDoFnInvokerRegistrar.class).iterator();
        this.doFnInvoker = !invokerReg.hasNext() ? DoFnInvokers.tryInvokeSetupFor(this.doFn, (PipelineOptions)this.samzaPipelineOptions) : ((SamzaDoFnInvokerRegistrar)Iterators.getOnlyElement(invokerReg)).invokerSetupFor(this.doFn, this.samzaPipelineOptions, context);
    }

    @UnknownKeyFor @NonNull @Initialized FutureCollector<OutT> createFutureCollector() {
        return new FutureCollectorImpl();
    }

    private @UnknownKeyFor @NonNull @Initialized String getTimerStateId(@UnknownKeyFor @NonNull @Initialized DoFnSignature signature) {
        StringBuilder builder = new StringBuilder("timer");
        if (signature.usesTimers()) {
            signature.timerDeclarations().keySet().forEach(builder::append);
        }
        return builder.toString();
    }

    @Override
    public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<InT> inputElement, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        try {
            this.bundleManager.tryStartBundle();
            Iterable rejectedValues = this.pushbackFnRunner.processElementInReadyWindows(inputElement);
            for (WindowedValue rejectedValue : rejectedValues) {
                if (rejectedValue.getTimestamp().compareTo((ReadableInstant)this.pushbackWatermarkHold) < 0) {
                    this.pushbackWatermarkHold = rejectedValue.getTimestamp();
                }
                this.pushbackValues.add(rejectedValue);
            }
            this.bundleManager.tryFinishBundle(emitter);
        }
        catch (Throwable t) {
            LOG.error("Encountered error during process element", t);
            this.bundleManager.signalFailure(t);
            throw t;
        }
    }

    private void doProcessWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        this.inputWatermark = watermark;
        if (this.sideInputWatermark.isEqual((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            this.emitAllPushbackValues();
        }
        Instant actualInputWatermark = this.pushbackWatermarkHold.isBefore((ReadableInstant)this.inputWatermark) ? this.pushbackWatermarkHold : this.inputWatermark;
        this.timerInternalsFactory.setInputWatermark(actualInputWatermark);
        Collection<KeyedTimerData<?>> readyTimers = this.timerInternalsFactory.removeReadyTimers();
        if (!readyTimers.isEmpty()) {
            this.pushbackFnRunner.startBundle();
            for (KeyedTimerData<?> keyedTimerData : readyTimers) {
                this.fireTimer(keyedTimerData);
            }
            this.pushbackFnRunner.finishBundle();
        }
        if (this.timerInternalsFactory.getOutputWatermark() == null || this.timerInternalsFactory.getOutputWatermark().isBefore((ReadableInstant)actualInputWatermark)) {
            this.timerInternalsFactory.setOutputWatermark(actualInputWatermark);
            emitter.emitWatermark(this.timerInternalsFactory.getOutputWatermark());
        }
    }

    @Override
    public void processWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        this.bundleManager.processWatermark(watermark, emitter);
    }

    @Override
    public void processSideInput(@UnknownKeyFor @NonNull @Initialized String id, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?>> elements, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        Preconditions.checkState((boolean)this.bundleDisabled, (Object)"Side input not supported in bundling mode. Please disable bundling.");
        WindowedValue<? extends Iterable<?>> retypedElements = elements;
        PCollectionView<?> view = this.idToViewMap.get(id);
        if (view == null) {
            throw new IllegalArgumentException("No mapping of id " + id + " to view.");
        }
        this.sideInputHandler.addSideInputValue(view, retypedElements);
        ArrayList<WindowedValue<InT>> previousPushbackValues = new ArrayList<WindowedValue<InT>>(this.pushbackValues);
        this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
        this.pushbackValues.clear();
        for (WindowedValue windowedValue : previousPushbackValues) {
            this.processElement(windowedValue, emitter);
        }
        this.processWatermark(this.inputWatermark, emitter);
    }

    @Override
    public void processSideInputWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        Preconditions.checkState((boolean)this.bundleDisabled, (Object)"Side input not supported in bundling mode. Please disable bundling.");
        this.sideInputWatermark = watermark;
        if (this.sideInputWatermark.isEqual((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            this.processWatermark(this.inputWatermark, emitter);
        }
    }

    @Override
    public void processTimer(@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @Nullable @Initialized Void> keyedTimerData, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
        if (this.bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
            this.bundleManager.processTimer(keyedTimerData, emitter);
            return;
        }
        this.pushbackFnRunner.startBundle();
        this.fireTimer(keyedTimerData);
        this.pushbackFnRunner.finishBundle();
        this.timerInternalsFactory.removeProcessingTimer(keyedTimerData);
    }

    @Override
    public void close() {
        this.doFnInvoker.invokeTeardown();
        try (StageBundleFactory factory = this.stageBundleFactory;){
            ExecutableStageContext context = this.stageContext;
            Throwable throwable = null;
            if (context != null) {
                if (throwable != null) {
                    try {
                        context.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    context.close();
                }
            }
        }
        catch (Exception e) {
            LOG.error("Failed to close stage bundle factory", (Throwable)e);
        }
    }

    private void fireTimer(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> keyedTimerData) {
        TimerInternals.TimerData timer = keyedTimerData.getTimerData();
        LOG.debug("Firing timer {}", (Object)timer);
        StateNamespace namespace = timer.getNamespace();
        BoundedWindow window = ((StateNamespaces.WindowNamespace)namespace).getWindow();
        this.fnRunner.onTimer(timer.getTimerId(), timer.getTimerFamilyId(), keyedTimerData.getKey(), window, timer.getTimestamp(), timer.getOutputTimestamp(), timer.getDomain());
    }

    private void emitAllPushbackValues() {
        if (!this.pushbackValues.isEmpty()) {
            this.pushbackFnRunner.startBundle();
            ArrayList<WindowedValue<InT>> previousPushbackValues = new ArrayList<WindowedValue<InT>>(this.pushbackValues);
            this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
            this.pushbackValues.clear();
            for (WindowedValue windowedValue : previousPushbackValues) {
                this.fnRunner.processElement(windowedValue);
            }
            this.pushbackFnRunner.finishBundle();
        }
    }

    private @UnknownKeyFor @NonNull @Initialized BundleManager.BundleProgressListener<OutT> createBundleProgressListener() {
        return new BundleManager.BundleProgressListener<OutT>(){

            @Override
            public void onBundleStarted() {
                DoFnOp.this.pushbackFnRunner.startBundle();
            }

            @Override
            public void onBundleFinished(@UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
                DoFnOp.this.pushbackFnRunner.finishBundle();
            }

            @Override
            public void onWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
                DoFnOp.this.doProcessWatermark(watermark, emitter);
            }
        };
    }

    static <T, OutT> @UnknownKeyFor @NonNull @Initialized CompletionStage<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>> createOutputFuture(@UnknownKeyFor @NonNull @Initialized WindowedValue<T> windowedValue, @UnknownKeyFor @NonNull @Initialized CompletionStage<T> valueFuture, @UnknownKeyFor @NonNull @Initialized Function<T, OutT> valueMapper) {
        return valueFuture.thenApply(res -> WindowedValue.of(valueMapper.apply(res), (Instant)windowedValue.getTimestamp(), (Collection)windowedValue.getWindows(), (PaneInfo)windowedValue.getPane()));
    }

    public static class MultiOutputManagerFactory
    implements OutputManagerFactory<RawUnionValue> {
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Integer> tagToIndexMap;

        public MultiOutputManagerFactory(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Integer> tagToIndexMap) {
            this.tagToIndexMap = tagToIndexMap;
        }

        @Override
        public // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized DoFnRunners.OutputManager create(@UnknownKeyFor @NonNull @Initialized OpEmitter<@UnknownKeyFor @NonNull @Initialized RawUnionValue> emitter) {
            return this.createOutputManager(emitter, null);
        }

        @Override
        public // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized DoFnRunners.OutputManager create(@UnknownKeyFor @NonNull @Initialized OpEmitter<@UnknownKeyFor @NonNull @Initialized RawUnionValue> emitter, @UnknownKeyFor @NonNull @Initialized FutureCollector<@UnknownKeyFor @NonNull @Initialized RawUnionValue> collector) {
            return this.createOutputManager(emitter, collector);
        }

        private // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized DoFnRunners.OutputManager createOutputManager(final @UnknownKeyFor @NonNull @Initialized OpEmitter<@UnknownKeyFor @NonNull @Initialized RawUnionValue> emitter, final @UnknownKeyFor @NonNull @Initialized FutureCollector<@UnknownKeyFor @NonNull @Initialized RawUnionValue> collector) {
            return new DoFnRunners.OutputManager(){

                public <T> void output(@UnknownKeyFor @NonNull @Initialized TupleTag<T> tupleTag, @UnknownKeyFor @NonNull @Initialized WindowedValue<T> windowedValue) {
                    int index = (Integer)tagToIndexMap.get(tupleTag);
                    Object rawValue = windowedValue.getValue();
                    if (rawValue instanceof CompletionStage) {
                        CompletionStage valueFuture = (CompletionStage)rawValue;
                        if (collector != null) {
                            collector.add(DoFnOp.createOutputFuture(windowedValue, valueFuture, res -> new RawUnionValue(index, res)));
                        }
                    } else {
                        RawUnionValue rawUnionValue = new RawUnionValue(index, rawValue);
                        emitter.emitElement(windowedValue.withValue((Object)rawUnionValue));
                    }
                }
            };
        }
    }

    public static class SingleOutputManagerFactory<@UnknownKeyFor OutT>
    implements OutputManagerFactory<OutT> {
        @Override
        public // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized DoFnRunners.OutputManager create(@UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter) {
            return this.createOutputManager(emitter, null);
        }

        @Override
        public // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized DoFnRunners.OutputManager create(@UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter, @UnknownKeyFor @NonNull @Initialized FutureCollector<OutT> collector) {
            return this.createOutputManager(emitter, collector);
        }

        private // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized DoFnRunners.OutputManager createOutputManager(final @UnknownKeyFor @NonNull @Initialized OpEmitter<OutT> emitter, final @UnknownKeyFor @NonNull @Initialized FutureCollector<OutT> collector) {
            return new DoFnRunners.OutputManager(){

                public <T> void output(@UnknownKeyFor @NonNull @Initialized TupleTag<T> tupleTag, @UnknownKeyFor @NonNull @Initialized WindowedValue<T> windowedValue) {
                    if (windowedValue.getValue() instanceof CompletionStage) {
                        CompletionStage valueFuture = (CompletionStage)windowedValue.getValue();
                        if (collector != null) {
                            collector.add(DoFnOp.createOutputFuture(windowedValue, valueFuture, value -> value));
                        }
                    } else {
                        WindowedValue<T> retypedWindowedValue = windowedValue;
                        emitter.emitElement(retypedWindowedValue);
                    }
                }
            };
        }
    }
}

