/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.instance;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.api.Configurable;
import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
import org.apache.gobblin.runtime.api.GobblinInstanceLauncher;
import org.apache.gobblin.runtime.api.JobCatalog;
import org.apache.gobblin.runtime.api.JobExecutionLauncher;
import org.apache.gobblin.runtime.api.JobExecutionMonitor;
import org.apache.gobblin.runtime.api.JobExecutionState;
import org.apache.gobblin.runtime.api.JobLifecycleListener;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecScheduler;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl;
import org.apache.gobblin.runtime.std.DefaultJobExecutionStateListenerImpl;
import org.apache.gobblin.runtime.std.JobLifecycleListenersList;
import org.apache.gobblin.util.ExecutorsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultGobblinInstanceDriverImpl
extends AbstractIdleService
implements GobblinInstanceDriver {
    protected final Logger _log;
    protected final String _instanceName;
    protected final Configurable _sysConfig;
    protected final JobCatalog _jobCatalog;
    protected final JobSpecScheduler _jobScheduler;
    protected final JobExecutionLauncher _jobLauncher;
    protected final GobblinInstanceLauncher.ConfigAccessor _instanceCfg;
    protected final JobLifecycleListenersList _callbacksDispatcher;
    private final boolean _instrumentationEnabled;
    protected final MetricContext _metricCtx;
    protected JobSpecListener _jobSpecListener;
    private final GobblinInstanceDriver.StandardMetrics _metrics;
    private final SharedResourcesBroker<GobblinScopeTypes> _instanceBroker;

    public DefaultGobblinInstanceDriverImpl(String instanceName, Configurable sysConfig, JobCatalog jobCatalog, JobSpecScheduler jobScheduler, JobExecutionLauncher jobLauncher, Optional<MetricContext> baseMetricContext, Optional<Logger> log, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) {
        Preconditions.checkNotNull((Object)jobCatalog);
        Preconditions.checkNotNull((Object)jobScheduler);
        Preconditions.checkNotNull((Object)jobLauncher);
        Preconditions.checkNotNull((Object)sysConfig);
        this._instanceName = instanceName;
        this._log = (Logger)log.or((Object)LoggerFactory.getLogger(this.getClass()));
        this._metricCtx = (MetricContext)baseMetricContext.or((Object)this.constructMetricContext(sysConfig, this._log));
        this._instrumentationEnabled = null != this._metricCtx && GobblinMetrics.isEnabled((Config)sysConfig.getConfig());
        this._jobCatalog = jobCatalog;
        this._jobScheduler = jobScheduler;
        this._jobLauncher = jobLauncher;
        this._sysConfig = sysConfig;
        this._instanceCfg = GobblinInstanceLauncher.ConfigAccessor.createFromGlobalConfig(this._sysConfig.getConfig());
        this._callbacksDispatcher = new JobLifecycleListenersList(this._jobCatalog, this._jobScheduler, this._log);
        this._instanceBroker = instanceBroker;
        this._metrics = new GobblinInstanceDriver.StandardMetrics(this);
    }

    private MetricContext constructMetricContext(Configurable sysConfig, Logger log) {
        State tmpState = new State(sysConfig.getConfigAsProperties());
        return GobblinMetrics.isEnabled((Config)sysConfig.getConfig()) ? Instrumented.getMetricContext((State)tmpState, this.getClass()) : null;
    }

    @Override
    public JobCatalog getJobCatalog() {
        return this._jobCatalog;
    }

    @Override
    public MutableJobCatalog getMutableJobCatalog() {
        return (MutableJobCatalog)this._jobCatalog;
    }

    @Override
    public JobSpecScheduler getJobScheduler() {
        return this._jobScheduler;
    }

    @Override
    public JobExecutionLauncher getJobLauncher() {
        return this._jobLauncher;
    }

    @Override
    public Configurable getSysConfig() {
        return this._sysConfig;
    }

    @Override
    public SharedResourcesBroker<GobblinScopeTypes> getInstanceBroker() {
        return this._instanceBroker;
    }

    @Override
    public Logger getLog() {
        return this._log;
    }

    protected void startUp() throws Exception {
        this.getLog().info("Default driver: starting ...");
        this._jobSpecListener = new JobSpecListener();
        this._jobCatalog.addListener(this._jobSpecListener);
        this.getLog().info("Default driver: started.");
    }

    protected void shutDown() throws Exception {
        this.getLog().info("Default driver: shuttind down ...");
        if (null != this._jobSpecListener) {
            this._jobCatalog.removeListener(this._jobSpecListener);
        }
        this._callbacksDispatcher.close();
        this.getLog().info("Default driver: shut down.");
    }

    @VisibleForTesting
    JobSpecRunnable createJobSpecRunnable(JobSpec addedJob) {
        return new JobSpecRunnable(addedJob, this);
    }

    GobblinInstanceLauncher.ConfigAccessor getInstanceCfg() {
        return this._instanceCfg;
    }

    @Override
    public void registerJobLifecycleListener(JobLifecycleListener listener) {
        this._callbacksDispatcher.registerJobLifecycleListener(listener);
    }

    @Override
    public void unregisterJobLifecycleListener(JobLifecycleListener listener) {
        this._callbacksDispatcher.unregisterJobLifecycleListener(listener);
    }

    @Override
    public List<JobLifecycleListener> getJobLifecycleListeners() {
        return this._callbacksDispatcher.getJobLifecycleListeners();
    }

    @Override
    public void registerWeakJobLifecycleListener(JobLifecycleListener listener) {
        this._callbacksDispatcher.registerWeakJobLifecycleListener(listener);
    }

    public MetricContext getMetricContext() {
        return this._metricCtx;
    }

    public boolean isInstrumentationEnabled() {
        return this._instrumentationEnabled;
    }

    public List<Tag<?>> generateTags(State state) {
        return Collections.emptyList();
    }

    public void switchMetricContext(List<Tag<?>> tags) {
        throw new UnsupportedOperationException();
    }

    public void switchMetricContext(MetricContext context) {
        throw new UnsupportedOperationException();
    }

    @Override
    public GobblinInstanceDriver.StandardMetrics getMetrics() {
        return this._metrics;
    }

    @Override
    public String getInstanceName() {
        return this._instanceName;
    }

    protected class JobSpecListener
    extends DefaultJobCatalogListenerImpl {
        public JobSpecListener() {
            super(LoggerFactory.getLogger((String)(DefaultGobblinInstanceDriverImpl.this._log.getName() + "_jobSpecListener")));
        }

        public String toString() {
            return ((Logger)this._log.get()).getName();
        }

        @Override
        public void onAddJob(JobSpec addedJob) {
            super.onAddJob(addedJob);
            DefaultGobblinInstanceDriverImpl.this._jobScheduler.scheduleJob(addedJob, DefaultGobblinInstanceDriverImpl.this.createJobSpecRunnable(addedJob));
        }

        @Override
        public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
            super.onDeleteJob(deletedJobURI, deletedJobVersion);
            DefaultGobblinInstanceDriverImpl.this._jobScheduler.unscheduleJob(deletedJobURI);
        }

        @Override
        public void onUpdateJob(JobSpec updatedJob) {
            super.onUpdateJob(updatedJob);
            DefaultGobblinInstanceDriverImpl.this._jobScheduler.scheduleJob(updatedJob, DefaultGobblinInstanceDriverImpl.this.createJobSpecRunnable(updatedJob));
        }
    }

    class JobSpecRunnable
    implements Runnable {
        private final JobSpec _jobSpec;
        private final GobblinInstanceDriver _instanceDriver;

        public JobSpecRunnable(JobSpec jobSpec, GobblinInstanceDriver instanceDriver) {
            this._jobSpec = jobSpec;
            this._instanceDriver = instanceDriver;
        }

        @Override
        public void run() {
            try {
                JobExecutionMonitor monitor = DefaultGobblinInstanceDriverImpl.this._jobLauncher.launchJob(new ResolvedJobSpec(this._jobSpec, this._instanceDriver));
                if (!(monitor instanceof JobLauncherExecutionDriver.JobExecutionMonitorAndDriver)) {
                    throw new UnsupportedOperationException(JobLauncherExecutionDriver.JobExecutionMonitorAndDriver.class.getName() + " is expected.");
                }
                JobLauncherExecutionDriver driver = ((JobLauncherExecutionDriver.JobExecutionMonitorAndDriver)monitor).getDriver();
                DefaultGobblinInstanceDriverImpl.this._callbacksDispatcher.onJobLaunch(driver);
                driver.registerStateListener(new JobStateTracker());
                ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)DefaultGobblinInstanceDriverImpl.this._log), (Optional)Optional.of((Object)"gobblin-instance-driver")).newThread(driver).start();
            }
            catch (Throwable t) {
                DefaultGobblinInstanceDriverImpl.this._log.error("Job launch failed: " + t, t);
            }
        }
    }

    class JobStateTracker
    extends DefaultJobExecutionStateListenerImpl {
        public JobStateTracker() {
            super(LoggerFactory.getLogger((String)(DefaultGobblinInstanceDriverImpl.this._log.getName() + "_jobExecutionListener")));
        }

        public String toString() {
            return ((Logger)this._log.get()).getName();
        }

        @Override
        public void onStatusChange(JobExecutionState state, JobState.RunningState previousStatus, JobState.RunningState newStatus) {
            super.onStatusChange(state, previousStatus, newStatus);
            DefaultGobblinInstanceDriverImpl.this._callbacksDispatcher.onStatusChange(state, previousStatus, newStatus);
        }

        @Override
        public void onStageTransition(JobExecutionState state, String previousStage, String newStage) {
            super.onStageTransition(state, previousStage, newStage);
            DefaultGobblinInstanceDriverImpl.this._callbacksDispatcher.onStageTransition(state, previousStage, newStage);
        }

        @Override
        public void onMetadataChange(JobExecutionState state, String key, Object oldValue, Object newValue) {
            super.onMetadataChange(state, key, oldValue, newValue);
            DefaultGobblinInstanceDriverImpl.this._callbacksDispatcher.onMetadataChange(state, key, oldValue, newValue);
        }
    }
}

