/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.CommittedResult;
import org.apache.beam.runners.direct.CompletionCallback;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ModelEnforcement;
import org.apache.beam.runners.direct.ModelEnforcementFactory;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformExecutorService;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.repackaged.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.direct.repackaged.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TransformExecutor<T>
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(TransformExecutor.class);
    private final TransformEvaluatorFactory evaluatorFactory;
    private final Iterable<? extends ModelEnforcementFactory> modelEnforcements;
    private final AppliedPTransform<?, ?, ?> transform;
    private final CommittedBundle<T> inputBundle;
    private final CompletionCallback onComplete;
    private final TransformExecutorService transformEvaluationState;
    private final EvaluationContext context;

    public static <T> TransformExecutor<T> create(EvaluationContext context, TransformEvaluatorFactory factory, Iterable<? extends ModelEnforcementFactory> modelEnforcements, CommittedBundle<T> inputBundle, AppliedPTransform<?, ?, ?> transform, CompletionCallback completionCallback, TransformExecutorService transformEvaluationState) {
        return new TransformExecutor<T>(context, factory, modelEnforcements, inputBundle, transform, completionCallback, transformEvaluationState);
    }

    private TransformExecutor(EvaluationContext context, TransformEvaluatorFactory factory, Iterable<? extends ModelEnforcementFactory> modelEnforcements, CommittedBundle<T> inputBundle, AppliedPTransform<?, ?, ?> transform, CompletionCallback completionCallback, TransformExecutorService transformEvaluationState) {
        this.evaluatorFactory = factory;
        this.modelEnforcements = modelEnforcements;
        this.inputBundle = inputBundle;
        this.transform = transform;
        this.onComplete = completionCallback;
        this.transformEvaluationState = transformEvaluationState;
        this.context = context;
    }

    @Override
    public void run() {
        MetricsContainerImpl metricsContainer = new MetricsContainerImpl(this.transform.getFullName());
        try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer((MetricsContainer)metricsContainer);){
            ArrayList<ModelEnforcement<T>> enforcements = new ArrayList<ModelEnforcement<T>>();
            for (ModelEnforcementFactory modelEnforcementFactory : this.modelEnforcements) {
                ModelEnforcement<T> enforcement = modelEnforcementFactory.forBundle(this.inputBundle, this.transform);
                enforcements.add(enforcement);
            }
            TransformEvaluator evaluator = this.evaluatorFactory.forApplication(this.transform, this.inputBundle);
            if (evaluator == null) {
                this.onComplete.handleEmpty(this.transform);
                return;
            }
            this.processElements(evaluator, metricsContainer, enforcements);
            this.finishBundle(evaluator, metricsContainer, enforcements);
        }
        catch (Exception e) {
            this.onComplete.handleException(this.inputBundle, e);
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw new RuntimeException(e);
        }
        catch (Error err) {
            LOG.error("Error occurred within {}", (Object)this, (Object)err);
            this.onComplete.handleError(err);
            throw err;
        }
        finally {
            this.context.getMetrics().commitPhysical(this.inputBundle, metricsContainer.getCumulative());
            this.transformEvaluationState.complete(this);
        }
    }

    private void processElements(TransformEvaluator<T> evaluator, MetricsContainerImpl metricsContainer, Collection<ModelEnforcement<T>> enforcements) throws Exception {
        if (this.inputBundle != null) {
            for (WindowedValue<T> value : this.inputBundle.getElements()) {
                for (ModelEnforcement<T> enforcement : enforcements) {
                    enforcement.beforeElement(value);
                }
                evaluator.processElement(value);
                MetricUpdates deltas = metricsContainer.getUpdates();
                if (deltas != null) {
                    this.context.getMetrics().updatePhysical(this.inputBundle, deltas);
                    metricsContainer.commitUpdates();
                }
                for (ModelEnforcement<T> enforcement : enforcements) {
                    enforcement.afterElement(value);
                }
            }
        }
    }

    private TransformResult<T> finishBundle(TransformEvaluator<T> evaluator, MetricsContainerImpl metricsContainer, Collection<ModelEnforcement<T>> enforcements) throws Exception {
        TransformResult<T> result = evaluator.finishBundle().withLogicalMetricUpdates(metricsContainer.getCumulative());
        CommittedResult outputs = this.onComplete.handleResult(this.inputBundle, result);
        for (ModelEnforcement<T> enforcement : enforcements) {
            enforcement.afterFinish(this.inputBundle, result, outputs.getOutputs());
        }
        return result;
    }
}

