package org.apache.beam.runners.dataflow;

import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.hamcrest.MatcherAssert;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/dataflow/TestDataflowRunner.class */
public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
    private static final String TENTATIVE_COUNTER = "tentative";
    private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
    private final TestDataflowPipelineOptions options;
    private final DataflowClient dataflowClient;
    private final DataflowRunner runner;
    private int expectedNumberOfAssertions = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/TestDataflowRunner$CancelOnError.class */
    public static class CancelOnError implements Callable<Void> {
        private final DataflowPipelineJob job;
        private final ErrorMonitorMessagesHandler messageHandler;

        public CancelOnError(DataflowPipelineJob dataflowPipelineJob, ErrorMonitorMessagesHandler errorMonitorMessagesHandler) {
            this.job = dataflowPipelineJob;
            this.messageHandler = errorMonitorMessagesHandler;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            while (true) {
                PipelineResult.State state = this.job.getState();
                if (this.messageHandler.hasSeenError() && !this.job.getState().isTerminal()) {
                    this.job.cancel();
                    TestDataflowRunner.LOG.info("Cancelling Dataflow job {}", this.job.getJobId());
                    return null;
                }
                if (state.isTerminal()) {
                    return null;
                }
                Thread.sleep(3000L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/TestDataflowRunner$ErrorMonitorMessagesHandler.class */
    public static class ErrorMonitorMessagesHandler implements MonitoringUtil.JobMessagesHandler {
        private final DataflowPipelineJob job;
        private final MonitoringUtil.JobMessagesHandler messageHandler;
        private final StringBuilder errorMessage;
        private volatile boolean hasSeenError;

        private ErrorMonitorMessagesHandler(DataflowPipelineJob dataflowPipelineJob, MonitoringUtil.JobMessagesHandler jobMessagesHandler) {
            this.job = dataflowPipelineJob;
            this.messageHandler = jobMessagesHandler;
            this.errorMessage = new StringBuilder();
            this.hasSeenError = false;
        }

        @Override // org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler
        public void process(List<JobMessage> list) {
            this.messageHandler.process(list);
            for (JobMessage jobMessage : list) {
                if ("JOB_MESSAGE_ERROR".equals(jobMessage.getMessageImportance())) {
                    TestDataflowRunner.LOG.info("Dataflow job {} threw exception. Failure message was: {}", this.job.getJobId(), jobMessage.getMessageText());
                    this.errorMessage.append(jobMessage.getMessageText());
                    this.hasSeenError = true;
                }
            }
        }

        boolean hasSeenError() {
            return this.hasSeenError;
        }

        String getErrorMessage() {
            return this.errorMessage.toString();
        }
    }

    TestDataflowRunner(TestDataflowPipelineOptions testDataflowPipelineOptions, DataflowClient dataflowClient) {
        this.options = testDataflowPipelineOptions;
        this.dataflowClient = dataflowClient;
        this.runner = DataflowRunner.fromOptions(testDataflowPipelineOptions);
    }

    public static TestDataflowRunner fromOptions(PipelineOptions pipelineOptions) {
        TestDataflowPipelineOptions testDataflowPipelineOptions = (TestDataflowPipelineOptions) pipelineOptions.as(TestDataflowPipelineOptions.class);
        testDataflowPipelineOptions.setTempLocation(Joiner.on("/").join(testDataflowPipelineOptions.getTempRoot(), testDataflowPipelineOptions.getJobName(), new Object[]{PropertyNames.OUTPUT, "results"}));
        return new TestDataflowRunner(testDataflowPipelineOptions, DataflowClient.create((DataflowPipelineOptions) pipelineOptions.as(DataflowPipelineOptions.class)));
    }

    @VisibleForTesting
    static TestDataflowRunner fromOptionsAndClient(TestDataflowPipelineOptions testDataflowPipelineOptions, DataflowClient dataflowClient) {
        return new TestDataflowRunner(testDataflowPipelineOptions, dataflowClient);
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public DataflowPipelineJob m20run(Pipeline pipeline) {
        return run(pipeline, this.runner);
    }

    DataflowPipelineJob run(Pipeline pipeline, DataflowRunner dataflowRunner) {
        Boolean valueOf;
        Optional<Boolean> checkForPAssertSuccess;
        updatePAssertCount(pipeline);
        TestPipelineOptions as = this.options.as(TestPipelineOptions.class);
        DataflowPipelineJob m10run = dataflowRunner.m10run(pipeline);
        LOG.info("Running Dataflow job {} with {} expected assertions.", m10run.getJobId(), Integer.valueOf(this.expectedNumberOfAssertions));
        MatcherAssert.assertThat(m10run, as.getOnCreateMatcher());
        ErrorMonitorMessagesHandler errorMonitorMessagesHandler = new ErrorMonitorMessagesHandler(m10run, new MonitoringUtil.LoggingHandler());
        if (this.options.isStreaming()) {
            valueOf = Boolean.valueOf(waitForStreamingJobTermination(m10run, errorMonitorMessagesHandler));
            checkForPAssertSuccess = Optional.absent();
        } else {
            valueOf = Boolean.valueOf(waitForBatchJobTermination(m10run, errorMonitorMessagesHandler));
            checkForPAssertSuccess = checkForPAssertSuccess(m10run);
        }
        if (!checkForPAssertSuccess.isPresent()) {
            LOG.warn("Dataflow job {} did not output a success or failure metric.", m10run.getJobId());
        } else if (!((Boolean) checkForPAssertSuccess.get()).booleanValue()) {
            throw new AssertionError(errorMessage(m10run, errorMonitorMessagesHandler));
        }
        if (!valueOf.booleanValue()) {
            throw new RuntimeException(errorMessage(m10run, errorMonitorMessagesHandler));
        }
        MatcherAssert.assertThat(m10run, as.getOnSuccessMatcher());
        return m10run;
    }

    @SuppressFBWarnings({"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
    private boolean waitForStreamingJobTermination(DataflowPipelineJob dataflowPipelineJob, ErrorMonitorMessagesHandler errorMonitorMessagesHandler) {
        this.options.getExecutorService().submit(new CancelOnError(dataflowPipelineJob, errorMonitorMessagesHandler));
        try {
            PipelineResult.State waitUntilFinish = dataflowPipelineJob.waitUntilFinish(Duration.standardSeconds(this.options.getTestTimeoutSeconds().longValue()), errorMonitorMessagesHandler);
            if (waitUntilFinish != null && waitUntilFinish.isTerminal()) {
                return waitUntilFinish == PipelineResult.State.DONE && !errorMonitorMessagesHandler.hasSeenError();
            }
            LOG.info("Dataflow job {} took longer than {} seconds to complete, cancelling.", dataflowPipelineJob.getJobId(), this.options.getTestTimeoutSeconds());
            try {
                dataflowPipelineJob.cancel();
                return false;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        } catch (InterruptedException e3) {
            Thread.interrupted();
            return false;
        }
    }

    private boolean waitForBatchJobTermination(DataflowPipelineJob dataflowPipelineJob, ErrorMonitorMessagesHandler errorMonitorMessagesHandler) {
        try {
            dataflowPipelineJob.waitUntilFinish(Duration.standardSeconds(-1L), errorMonitorMessagesHandler);
            return dataflowPipelineJob.getState() == PipelineResult.State.DONE;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e2) {
            Thread.interrupted();
            return false;
        }
    }

    private static String errorMessage(DataflowPipelineJob dataflowPipelineJob, ErrorMonitorMessagesHandler errorMonitorMessagesHandler) {
        return Strings.isNullOrEmpty(errorMonitorMessagesHandler.getErrorMessage()) ? String.format("Dataflow job %s terminated in state %s but did not return a failure reason.", dataflowPipelineJob.getJobId(), dataflowPipelineJob.getState()) : errorMonitorMessagesHandler.getErrorMessage();
    }

    @VisibleForTesting
    void updatePAssertCount(Pipeline pipeline) {
        this.expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
    }

    @VisibleForTesting
    Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob dataflowPipelineJob) {
        JobMetrics jobMetrics = getJobMetrics(dataflowPipelineJob);
        if (jobMetrics == null || jobMetrics.getMetrics() == null) {
            LOG.warn("Metrics not present for Dataflow job {}.", dataflowPipelineJob.getJobId());
            return Optional.absent();
        }
        int i = 0;
        int i2 = 0;
        for (MetricUpdate metricUpdate : jobMetrics.getMetrics()) {
            if (metricUpdate.getName() != null && metricUpdate.getName().getContext() != null && metricUpdate.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
                if ("PAssertSuccess".equals(metricUpdate.getName().getName())) {
                    i += ((BigDecimal) metricUpdate.getScalar()).intValue();
                } else if ("PAssertFailure".equals(metricUpdate.getName().getName())) {
                    i2 += ((BigDecimal) metricUpdate.getScalar()).intValue();
                }
            }
        }
        if (i2 > 0) {
            LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{dataflowPipelineJob.getJobId(), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(this.expectedNumberOfAssertions)});
            return Optional.of(false);
        }
        if (i >= this.expectedNumberOfAssertions) {
            LOG.info("Success result for Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{dataflowPipelineJob.getJobId(), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(this.expectedNumberOfAssertions)});
            return Optional.of(true);
        }
        PipelineResult.State state = dataflowPipelineJob.getState();
        if (state == PipelineResult.State.FAILED || state == PipelineResult.State.CANCELLED) {
            LOG.info("Dataflow job {} terminated in failure state {} without reporting a failed assertion", dataflowPipelineJob.getJobId(), state);
            return Optional.absent();
        }
        LOG.info("Inconclusive results for Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{dataflowPipelineJob.getJobId(), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(this.expectedNumberOfAssertions)});
        return Optional.absent();
    }

    @VisibleForTesting
    JobMetrics getJobMetrics(DataflowPipelineJob dataflowPipelineJob) {
        JobMetrics jobMetrics = null;
        try {
            jobMetrics = this.dataflowClient.getJobMetrics(dataflowPipelineJob.getJobId());
        } catch (IOException e) {
            LOG.warn("Failed to get job metrics: ", e);
        }
        return jobMetrics;
    }

    public String toString() {
        return "TestDataflowRunner#" + this.options.getAppName();
    }
}
