/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.job_exec;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ExecutionList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.SharedResourcesBrokerImpl;
import org.apache.gobblin.broker.SimpleScope;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.ScopeInstance;
import org.apache.gobblin.broker.iface.ScopeType;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobLauncherFactory;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.api.Configurable;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.JobExecution;
import org.apache.gobblin.runtime.api.JobExecutionDriver;
import org.apache.gobblin.runtime.api.JobExecutionLauncher;
import org.apache.gobblin.runtime.api.JobExecutionMonitor;
import org.apache.gobblin.runtime.api.JobExecutionResult;
import org.apache.gobblin.runtime.api.JobExecutionState;
import org.apache.gobblin.runtime.api.JobExecutionStateListener;
import org.apache.gobblin.runtime.api.JobExecutionStatus;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.MonitoredObject;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.runtime.listeners.AbstractJobListener;
import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
import org.apache.gobblin.runtime.std.JobExecutionStateListeners;
import org.apache.gobblin.runtime.std.JobExecutionUpdatable;
import org.apache.gobblin.util.ExecutorsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobLauncherExecutionDriver
extends FutureTask<JobExecutionResult>
implements JobExecutionDriver {
    private final Logger _log;
    private final JobSpec _jobSpec;
    private final JobExecutionUpdatable _jobExec;
    private final JobExecutionState _jobState;
    private final JobExecutionStateListeners _callbackDispatcher;
    private final ExecutionList _executionList;
    private final DriverRunnable _runnable;
    private final Closer _closer = Closer.create();
    private JobContext _jobContext;

    public static JobLauncherExecutionDriver create(Configurable sysConfig, JobSpec jobSpec, Optional<JobLauncherFactory.JobLauncherType> jobLauncherType, Optional<Logger> log, boolean instrumentationEnabled, JobExecutionLauncher.StandardMetrics launcherMetrics, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) {
        Logger actualLog = log.isPresent() ? (Logger)log.get() : LoggerFactory.getLogger(JobLauncherExecutionDriver.class);
        JobExecutionStateListeners callbackDispatcher = new JobExecutionStateListeners(actualLog);
        JobExecutionUpdatable jobExec = JobExecutionUpdatable.createFromJobSpec(jobSpec);
        JobExecutionState jobState = new JobExecutionState(jobSpec, jobExec, (Optional<JobExecutionStateListener>)Optional.of((Object)callbackDispatcher));
        JobLauncher jobLauncher = JobLauncherExecutionDriver.createLauncher(sysConfig, jobSpec, actualLog, (Optional<String>)(jobLauncherType.isPresent() ? Optional.of((Object)((JobLauncherFactory.JobLauncherType)((Object)jobLauncherType.get())).toString()) : Optional.absent()), instanceBroker);
        JobListenerToJobStateBridge bridge = new JobListenerToJobStateBridge(actualLog, jobState, instrumentationEnabled, launcherMetrics);
        DriverRunnable runnable = new DriverRunnable(jobLauncher, bridge, jobState, callbackDispatcher, jobExec);
        return new JobLauncherExecutionDriver(jobSpec, actualLog, runnable);
    }

    protected JobLauncherExecutionDriver(JobSpec jobSpec, Logger log, DriverRunnable runnable) {
        super(runnable);
        this._closer.register((Closeable)runnable.getJobLauncher());
        this._log = log;
        this._jobSpec = jobSpec;
        this._jobExec = runnable.getJobExec();
        this._callbackDispatcher = (JobExecutionStateListeners)this._closer.register((Closeable)runnable.getCallbackDispatcher());
        this._jobState = runnable.getJobState();
        this._executionList = new ExecutionList();
        this._runnable = runnable;
    }

    private static JobLauncher createLauncher(Configurable _sysConfig, JobSpec _jobSpec, Logger _log, Optional<String> jobLauncherType, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) {
        if (jobLauncherType.isPresent()) {
            return JobLauncherFactory.newJobLauncher(_sysConfig.getConfigAsProperties(), _jobSpec.getConfigAsProperties(), (String)jobLauncherType.get(), instanceBroker);
        }
        _log.info("Creating auto jobLauncher for " + _jobSpec);
        try {
            return JobLauncherFactory.newJobLauncher(_sysConfig.getConfigAsProperties(), _jobSpec.getConfigAsProperties(), instanceBroker);
        }
        catch (Exception e) {
            throw new RuntimeException("JobLauncher creation failed: " + e, e);
        }
    }

    @Override
    public JobExecution getJobExecution() {
        return this._jobExec;
    }

    @Override
    public JobExecutionStatus getJobExecutionStatus() {
        return this._jobState;
    }

    protected void startAsync() throws JobException {
        this._log.info("Starting " + this.getClass().getSimpleName());
        ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)this._log), (Optional)Optional.of((Object)"job-launcher-execution-driver")).newThread(this).start();
    }

    @Override
    protected void done() {
        this._executionList.execute();
        try {
            this.shutDown();
        }
        catch (IOException ioe) {
            this._log.error("Failed to close job launcher.");
        }
    }

    private void shutDown() throws IOException {
        this._log.info("Shutting down " + this.getClass().getSimpleName());
        if (null != this._jobContext) {
            switch (this._jobContext.getJobState().getState()) {
                case PENDING: 
                case SUCCESSFUL: 
                case RUNNING: {
                    this.cancel(false);
                    break;
                }
            }
        }
        this._closer.close();
    }

    public void addListener(Runnable listener, Executor executor) {
        this._executionList.add(listener, executor);
    }

    @VisibleForTesting
    JobLauncher getLegacyLauncher() {
        return this._runnable.getJobLauncher();
    }

    @Override
    public void registerStateListener(JobExecutionStateListener listener) {
        this._callbackDispatcher.registerStateListener(listener);
    }

    @Override
    public void unregisterStateListener(JobExecutionStateListener listener) {
        this._callbackDispatcher.unregisterStateListener(listener);
    }

    @Override
    public JobExecutionState getJobExecutionState() {
        return this._jobState;
    }

    @Override
    public void registerWeakStateListener(JobExecutionStateListener listener) {
        this._callbackDispatcher.registerWeakStateListener(listener);
    }

    @Override
    public boolean isDone() {
        JobState.RunningState runState = this.fetchRunningState();
        return runState == null ? false : runState.isDone();
    }

    private JobState.RunningState fetchRunningState() {
        MonitoredObject monitoredObject = this.getJobExecutionStatus().getRunningState();
        if (monitoredObject == null) {
            return null;
        }
        if (!(monitoredObject instanceof JobState.RunningState)) {
            throw new UnsupportedOperationException("Cannot process monitored object other than " + JobState.RunningState.class.getName());
        }
        return (JobState.RunningState)monitoredObject;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        JobState.RunningState runState = this.fetchRunningState();
        if (runState.isCancelled()) {
            return true;
        }
        if (runState.isDone()) {
            return false;
        }
        try {
            this.getLegacyLauncher().cancelJob(new AbstractJobListener(){});
        }
        catch (JobException e) {
            throw new RuntimeException("Unable to cancel job " + this._jobSpec + ": " + e, e);
        }
        return super.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return this.fetchRunningState().isCancelled();
    }

    @Override
    public JobExecutionResult get() throws InterruptedException {
        try {
            return (JobExecutionResult)super.get();
        }
        catch (ExecutionException ee) {
            return JobExecutionResult.createFailureResult(ee.getCause());
        }
    }

    @Override
    public JobExecutionResult get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        try {
            return (JobExecutionResult)super.get(timeout, unit);
        }
        catch (ExecutionException ee) {
            return JobExecutionResult.createFailureResult(ee.getCause());
        }
    }

    public static class JobExecutionMonitorAndDriver
    implements JobExecutionMonitor {
        JobLauncherExecutionDriver driver;

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return this.driver.cancel(mayInterruptIfRunning);
        }

        @Override
        public boolean isCancelled() {
            return this.driver.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.driver.isDone();
        }

        @Override
        public JobExecutionResult get() throws InterruptedException, ExecutionException {
            return this.driver.get();
        }

        @Override
        public JobExecutionResult get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.driver.get(timeout, unit);
        }

        @Override
        public MonitoredObject getRunningState() {
            return this.driver._jobState.getRunningState();
        }

        public JobExecutionMonitorAndDriver(JobLauncherExecutionDriver driver) {
            this.driver = driver;
        }

        public JobLauncherExecutionDriver getDriver() {
            return this.driver;
        }
    }

    public static class Launcher
    implements JobExecutionLauncher,
    GobblinInstanceEnvironment {
        private Optional<JobLauncherFactory.JobLauncherType> _jobLauncherType = Optional.absent();
        private Optional<Configurable> _sysConfig = Optional.absent();
        private Optional<GobblinInstanceEnvironment> _gobblinEnv = Optional.absent();
        private Optional<Logger> _log = Optional.absent();
        private Optional<MetricContext> _metricContext = Optional.absent();
        private Optional<Boolean> _instrumentationEnabled = Optional.absent();
        private JobExecutionLauncher.StandardMetrics _metrics;
        private Optional<SharedResourcesBroker<GobblinScopeTypes>> _instanceBroker = Optional.absent();

        public Launcher withJobLauncherType(JobLauncherFactory.JobLauncherType jobLauncherType) {
            Preconditions.checkNotNull((Object)((Object)jobLauncherType));
            this._jobLauncherType = Optional.of((Object)((Object)jobLauncherType));
            return this;
        }

        public Optional<JobLauncherFactory.JobLauncherType> getJobLauncherType() {
            return this._jobLauncherType;
        }

        public Configurable getDefaultSysConfig() {
            return this._gobblinEnv.isPresent() ? ((GobblinInstanceEnvironment)this._gobblinEnv.get()).getSysConfig() : DefaultConfigurableImpl.createFromConfig(ConfigFactory.empty());
        }

        @Override
        public Configurable getSysConfig() {
            if (!this._sysConfig.isPresent()) {
                this._sysConfig = Optional.of((Object)this.getDefaultSysConfig());
            }
            return (Configurable)this._sysConfig.get();
        }

        public Launcher withSysConfig(Configurable sysConfig) {
            this._sysConfig = Optional.of((Object)sysConfig);
            return this;
        }

        public Launcher withGobblinInstanceEnvironment(GobblinInstanceEnvironment gobblinInstance) {
            this._gobblinEnv = Optional.of((Object)gobblinInstance);
            return this;
        }

        public Optional<GobblinInstanceEnvironment> getGobblinInstanceEnvironment() {
            return this._gobblinEnv;
        }

        public Logger getLog(JobSpec jobSpec) {
            return Launcher.getJobLogger(this.getLog(), jobSpec);
        }

        public Launcher withInstrumentationEnabled(boolean enabled) {
            this._instrumentationEnabled = Optional.of((Object)enabled);
            return this;
        }

        public boolean getDefaultInstrumentationEnabled() {
            return this._gobblinEnv.isPresent() ? ((GobblinInstanceEnvironment)this._gobblinEnv.get()).isInstrumentationEnabled() : GobblinMetrics.isEnabled((Config)this.getSysConfig().getConfig());
        }

        public boolean isInstrumentationEnabled() {
            if (!this._instrumentationEnabled.isPresent()) {
                this._instrumentationEnabled = Optional.of((Object)this.getDefaultInstrumentationEnabled());
            }
            return (Boolean)this._instrumentationEnabled.get();
        }

        private static Logger getJobLogger(Logger parentLog, JobSpec jobSpec) {
            return LoggerFactory.getLogger((String)(parentLog.getName() + "." + jobSpec.toShortString()));
        }

        public Launcher withMetricContext(MetricContext instanceMetricContext) {
            this._metricContext = Optional.of((Object)instanceMetricContext);
            return this;
        }

        public MetricContext getMetricContext() {
            if (!this._metricContext.isPresent()) {
                this._metricContext = Optional.of((Object)this.getDefaultMetricContext());
            }
            return (MetricContext)this._metricContext.get();
        }

        public MetricContext getDefaultMetricContext() {
            if (this._gobblinEnv.isPresent()) {
                return ((GobblinInstanceEnvironment)this._gobblinEnv.get()).getMetricContext().childBuilder(JobExecutionLauncher.class.getSimpleName()).build();
            }
            State fakeState = new State(this.getSysConfig().getConfigAsProperties());
            ArrayList tags = new ArrayList();
            MetricContext res = Instrumented.getMetricContext((State)fakeState, Launcher.class, tags);
            return res;
        }

        @Override
        public JobExecutionMonitor launchJob(JobSpec jobSpec) {
            Preconditions.checkNotNull((Object)jobSpec);
            if (!(jobSpec instanceof ResolvedJobSpec)) {
                try {
                    jobSpec = new ResolvedJobSpec(jobSpec);
                }
                catch (JobTemplate.TemplateException | SpecNotFoundException exc) {
                    throw new RuntimeException("Can't launch job " + jobSpec.getUri(), exc);
                }
            }
            JobLauncherExecutionDriver driver = JobLauncherExecutionDriver.create(this.getSysConfig(), jobSpec, this._jobLauncherType, (Optional<Logger>)Optional.of((Object)this.getLog(jobSpec)), this.isInstrumentationEnabled(), this.getMetrics(), this.getInstanceBroker());
            return new JobExecutionMonitorAndDriver(driver);
        }

        public List<Tag<?>> generateTags(State state) {
            return Collections.emptyList();
        }

        public void switchMetricContext(List<Tag<?>> tags) {
            throw new UnsupportedOperationException();
        }

        public void switchMetricContext(MetricContext context) {
            throw new UnsupportedOperationException();
        }

        @Override
        public String getInstanceName() {
            return this._gobblinEnv.isPresent() ? ((GobblinInstanceEnvironment)this._gobblinEnv.get()).getInstanceName() : this.getClass().getName();
        }

        public Logger getDefaultLog() {
            return this._gobblinEnv.isPresent() ? ((GobblinInstanceEnvironment)this._gobblinEnv.get()).getLog() : LoggerFactory.getLogger(this.getClass());
        }

        @Override
        public Logger getLog() {
            if (!this._log.isPresent()) {
                this._log = Optional.of((Object)this.getDefaultLog());
            }
            return (Logger)this._log.get();
        }

        public Launcher withLog(Logger log) {
            this._log = Optional.of((Object)log);
            return this;
        }

        @Override
        public JobExecutionLauncher.StandardMetrics getMetrics() {
            if (this._metrics == null) {
                this._metrics = new JobExecutionLauncher.StandardMetrics(this);
            }
            return this._metrics;
        }

        public Launcher withInstanceBroker(SharedResourcesBroker<GobblinScopeTypes> broker) {
            this._instanceBroker = Optional.of(broker);
            return this;
        }

        @Override
        public SharedResourcesBroker<GobblinScopeTypes> getInstanceBroker() {
            if (!this._instanceBroker.isPresent()) {
                this._instanceBroker = this._gobblinEnv.isPresent() ? Optional.of(((GobblinInstanceEnvironment)this._gobblinEnv.get()).getInstanceBroker()) : Optional.of(this.getDefaultInstanceBroker());
            }
            return (SharedResourcesBroker)this._instanceBroker.get();
        }

        public SharedResourcesBroker<GobblinScopeTypes> getDefaultInstanceBroker() {
            this.getLog().warn("Creating a default instance broker for job launcher. Objects may not be shared across all jobs in this instance.");
            SharedResourcesBrokerImpl globalBroker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker((Config)this.getSysConfig().getConfig(), (ScopeInstance)GobblinScopeTypes.GLOBAL.defaultScopeInstance());
            return globalBroker.newSubscopedBuilder((ScopeInstance)new SimpleScope((ScopeType)GobblinScopeTypes.INSTANCE, this.getInstanceName())).build();
        }
    }

    static class JobListenerToJobStateBridge
    extends AbstractJobListener {
        private final JobExecutionState _jobState;
        private final boolean _instrumentationEnabled;
        private final JobExecutionLauncher.StandardMetrics _launcherMetrics;
        private JobContext _jobContext;

        public JobListenerToJobStateBridge(Logger log, JobExecutionState jobState, boolean instrumentationEnabled, JobExecutionLauncher.StandardMetrics launcherMetrics) {
            super((Optional<Logger>)Optional.of((Object)log));
            this._jobState = jobState;
            this._instrumentationEnabled = instrumentationEnabled;
            this._launcherMetrics = launcherMetrics;
        }

        @Override
        public void onJobPrepare(JobContext jobContext) throws Exception {
            super.onJobPrepare(jobContext);
            this._jobContext = jobContext;
            if (this._jobState.getRunningState() == null) {
                this._jobState.switchToPending();
            }
            this._jobState.switchToRunning();
            if (this._instrumentationEnabled && null != this._launcherMetrics) {
                this._launcherMetrics.getNumJobsLaunched().mark();
            }
        }

        @Override
        public void onJobStart(JobContext jobContext) throws Exception {
            super.onJobStart(jobContext);
        }

        @Override
        public void onJobCompletion(JobContext jobContext) throws Exception {
            Preconditions.checkArgument((jobContext.getJobState().getState() == JobState.RunningState.SUCCESSFUL || jobContext.getJobState().getState() == JobState.RunningState.COMMITTED || jobContext.getJobState().getState() == JobState.RunningState.FAILED ? 1 : 0) != 0, (Object)("Unexpected state: " + jobContext.getJobState().getState() + " in " + jobContext));
            super.onJobCompletion(jobContext);
            if (this._instrumentationEnabled && null != this._launcherMetrics) {
                this._launcherMetrics.getNumJobsCompleted().mark();
            }
            if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
                if (this._instrumentationEnabled && null != this._launcherMetrics) {
                    this._launcherMetrics.getNumJobsFailed().mark();
                }
                this._jobState.switchToFailed();
            } else {
                this._jobState.switchToSuccessful();
                this._jobState.switchToCommitted();
                if (this._instrumentationEnabled && null != this._launcherMetrics) {
                    this._launcherMetrics.getNumJobsCommitted().mark();
                }
            }
        }

        @Override
        public void onJobCancellation(JobContext jobContext) throws Exception {
            super.onJobCancellation(jobContext);
            this._jobState.switchToCancelled();
            if (this._instrumentationEnabled && null != this._launcherMetrics) {
                this._launcherMetrics.getNumJobsCancelled().mark();
            }
        }
    }

    private static class DriverRunnable
    implements Callable<JobExecutionResult> {
        private final JobLauncher jobLauncher;
        private final JobListenerToJobStateBridge bridge;
        private final JobExecutionState jobState;
        private final JobExecutionStateListeners callbackDispatcher;
        private final JobExecutionUpdatable jobExec;

        @Override
        public JobExecutionResult call() throws JobException, InterruptedException, TimeoutException {
            this.jobLauncher.launchJob(this.bridge);
            this.jobState.awaitForDone(Long.MAX_VALUE);
            return JobExecutionResult.createFromState(this.jobState);
        }

        public DriverRunnable(JobLauncher jobLauncher, JobListenerToJobStateBridge bridge, JobExecutionState jobState, JobExecutionStateListeners callbackDispatcher, JobExecutionUpdatable jobExec) {
            this.jobLauncher = jobLauncher;
            this.bridge = bridge;
            this.jobState = jobState;
            this.callbackDispatcher = callbackDispatcher;
            this.jobExec = jobExec;
        }

        public JobLauncher getJobLauncher() {
            return this.jobLauncher;
        }

        public JobListenerToJobStateBridge getBridge() {
            return this.bridge;
        }

        public JobExecutionState getJobState() {
            return this.jobState;
        }

        public JobExecutionStateListeners getCallbackDispatcher() {
            return this.callbackDispatcher;
        }

        public JobExecutionUpdatable getJobExec() {
            return this.jobExec;
        }
    }
}

