/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.ProcessFnRunner;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DoFnLifecycleManager;
import org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ParDoEvaluator;
import org.apache.beam.runners.direct.ParDoEvaluatorFactory;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;

class SplittableProcessElementsEvaluatorFactory<@UnknownKeyFor InputT, @UnknownKeyFor OutputT, @UnknownKeyFor RestrictionT, @UnknownKeyFor PositionT, @UnknownKeyFor WatermarkEstimatorStateT>
implements TransformEvaluatorFactory {
    private final @UnknownKeyFor @NonNull @Initialized ParDoEvaluatorFactory<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized KV<InputT, RestrictionT>>, OutputT> delegateFactory;
    private final @UnknownKeyFor @NonNull @Initialized ScheduledExecutorService ses;
    private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
    private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;

    SplittableProcessElementsEvaluatorFactory(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, final @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        this.evaluationContext = evaluationContext;
        this.options = options;
        this.delegateFactory = new ParDoEvaluatorFactory<KeyedWorkItem<byte[], KV<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, RestrictionT>>, OutputT>(evaluationContext, SplittableProcessElementsEvaluatorFactory.processFnRunnerFactory(), new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>(){

            public @UnknownKeyFor @NonNull @Initialized DoFnLifecycleManager load(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> application) {
                Preconditions.checkArgument((boolean)SplittableParDoViaKeyedWorkItems.ProcessElements.class.isInstance(application.getTransform()), (Object)("No know extraction of the fn from " + application));
                SplittableParDoViaKeyedWorkItems.ProcessElements transform = (SplittableParDoViaKeyedWorkItems.ProcessElements)application.getTransform();
                return DoFnLifecycleManager.of(transform.newProcessFn(transform.getFn()), options);
            }
        }, options);
        this.ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setThreadFactory(MoreExecutors.platformThreadFactory()).setNameFormat("direct-splittable-process-element-checkpoint-executor_" + this.hashCode()).build());
    }

    public <T> @UnknownKeyFor @NonNull @Initialized TransformEvaluator<T> forApplication(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> application, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> inputBundle) throws @UnknownKeyFor @NonNull @Initialized Exception {
        TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.ses.shutdownNow();
        this.delegateFactory.cleanup();
    }

    private @UnknownKeyFor @NonNull @Initialized TransformEvaluator<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized KV<InputT, RestrictionT>>> createEvaluator(@UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized KV<InputT, RestrictionT>>>, @UnknownKeyFor @NonNull @Initialized PCollectionTuple,  @UnknownKeyFor @NonNull @Initialized SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>> application, @UnknownKeyFor @NonNull @Initialized CommittedBundle<InputT> inputBundle) throws @UnknownKeyFor @NonNull @Initialized Exception {
        SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> transform = application.getTransform();
        DoFnLifecycleManagerRemovingTransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> evaluator = this.delegateFactory.createEvaluator((AppliedPTransform<PCollection<KeyedWorkItem<byte[], KV<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, RestrictionT>>>, PCollectionTuple, ?>)application, (PCollection<KeyedWorkItem<byte[], KV<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, RestrictionT>>>)inputBundle.getPCollection(), inputBundle.getKey(), application.getTransform().getSideInputs(), application.getTransform().getMainOutputTag(), application.getTransform().getAdditionalOutputTags().getAll(), DoFnSchemaInformation.create(), application.getTransform().getSideInputMapping());
        ParDoEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> pde = evaluator.getParDoEvaluator();
        SplittableParDoViaKeyedWorkItems.ProcessFn processFn = (SplittableParDoViaKeyedWorkItems.ProcessFn)((ProcessFnRunner)ProcessFnRunner.class.cast(pde.getFnRunner())).getFn();
        DirectExecutionContext.DirectStepContext stepContext = pde.getStepContext();
        processFn.setStateInternalsFactory(key -> stepContext.stateInternals());
        processFn.setTimerInternalsFactory(key -> stepContext.timerInternals());
        ReadyCheckingSideInputReader sideInputReader = this.evaluationContext.createSideInputReader(transform.getSideInputs());
        processFn.setSideInputReader(sideInputReader);
        processFn.setProcessElementInvoker(new OutputAndTimeBoundedSplittableProcessElementInvoker(transform.getFn(), this.options, pde.getOutputManager(), transform.getMainOutputTag(), sideInputReader, this.ses, 100, Duration.standardSeconds((long)1L), stepContext::bundleFinalizer));
        return evaluator;
    }

    private static <InputT, OutputT, RestrictionT> @UnknownKeyFor @NonNull @Initialized ParDoEvaluator.DoFnRunnerFactory<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized KV<InputT, RestrictionT>>, OutputT> processFnRunnerFactory() {
        return (options, fn, sideInputs, sideInputReader, outputManager, mainOutputTag, additionalOutputTags, stepContext, inputCoder, outputCoders, windowingStrategy, doFnSchemaInformation, sideInputMapping) -> {
            SplittableParDoViaKeyedWorkItems.ProcessFn processFn = (SplittableParDoViaKeyedWorkItems.ProcessFn)fn;
            return DoFnRunners.newProcessFnRunner(processFn, options, sideInputs, sideInputReader, outputManager, mainOutputTag, additionalOutputTags, stepContext, inputCoder, outputCoders, windowingStrategy, doFnSchemaInformation, sideInputMapping);
        };
    }
}

