/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.stateful.SparkTimerInternals;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.io.FileUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TestSparkRunner
extends PipelineRunner<SparkPipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(TestSparkRunner.class);
    private final PipelineOptions options;
    private final SparkRunner delegate;

    private TestSparkRunner(PipelineOptions options) {
        this.delegate = SparkRunner.fromOptions(options);
        this.options = options;
    }

    public static TestSparkRunner fromOptions(PipelineOptions options) {
        return new TestSparkRunner(options);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SparkPipelineResult run(Pipeline pipeline) {
        TestSparkPipelineOptions testSparkOptions = (TestSparkPipelineOptions)PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, (PipelineOptions)this.options);
        boolean isForceStreaming = testSparkOptions.isForceStreaming();
        SparkPipelineResult result = null;
        AggregatorsAccumulator.clear();
        MetricsAccumulator.clear();
        GlobalWatermarkHolder.clear();
        LOG.info("About to run test pipeline " + this.options.getJobName());
        if (isForceStreaming) {
            try {
                result = this.delegate.run(pipeline);
                TestSparkRunner.awaitWatermarksOrTimeout(testSparkOptions, result);
                result.stop();
                PipelineResult.State finishState = result.getState();
                MatcherAssert.assertThat((String)String.format("Finish state %s is not allowed.", finishState), (Object)finishState, (Matcher)Matchers.isOneOf((Object[])new PipelineResult.State[]{PipelineResult.State.STOPPED, PipelineResult.State.DONE}));
            }
            finally {
                try {
                    FileUtils.deleteDirectory((File)new File(testSparkOptions.getCheckpointDir()));
                }
                catch (IOException e) {
                    throw new RuntimeException("Failed to clear checkpoint tmp dir.", e);
                }
            }
        }
        result = this.delegate.run(pipeline);
        result.waitUntilFinish();
        result.stop();
        PipelineResult.State finishState = result.getState();
        MatcherAssert.assertThat((String)String.format("Finish state %s is not allowed.", finishState), (Object)finishState, (Matcher)Matchers.is((Object)PipelineResult.State.DONE));
        MatcherAssert.assertThat((Object)result, (Matcher)testSparkOptions.getOnCreateMatcher());
        MatcherAssert.assertThat((Object)result, (Matcher)testSparkOptions.getOnSuccessMatcher());
        return result;
    }

    private static void awaitWatermarksOrTimeout(TestSparkPipelineOptions testSparkPipelineOptions, SparkPipelineResult result) {
        Instant globalWatermark;
        Long timeoutMillis = Duration.standardSeconds((long)((Long)Preconditions.checkNotNull((Object)testSparkPipelineOptions.getTestTimeoutSeconds()))).getMillis();
        Long batchDurationMillis = testSparkPipelineOptions.getBatchIntervalMillis();
        Instant stopPipelineWatermark = new Instant((Object)testSparkPipelineOptions.getStopPipelineWatermark());
        result.waitUntilFinish(Duration.millis((long)batchDurationMillis));
        do {
            SparkTimerInternals sparkTimerInternals = SparkTimerInternals.global(GlobalWatermarkHolder.get(batchDurationMillis));
            sparkTimerInternals.advanceWatermark();
            globalWatermark = sparkTimerInternals.currentInputWatermarkTime();
            Uninterruptibles.sleepUninterruptibly((long)batchDurationMillis, (TimeUnit)TimeUnit.MILLISECONDS);
        } while ((timeoutMillis = Long.valueOf(timeoutMillis - batchDurationMillis)) > 0L && globalWatermark.isBefore((ReadableInstant)stopPipelineWatermark));
    }
}

