/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs.scheduler;

import java.util.AbstractMap;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.Singletons;
import org.apache.kylin.common.exception.ErrorCodeSupplier;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.guava30.shaded.common.base.Predicate;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.JobContext;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.core.lock.JdbcJobLock;
import org.apache.kylin.job.core.lock.LockAcquireListener;
import org.apache.kylin.job.core.lock.LockException;
import org.apache.kylin.job.domain.JobLock;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.util.JobContextUtil;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager;
import org.apache.kylin.streaming.jobs.scheduler.StreamingJobStatusWatcher;
import org.apache.kylin.streaming.jobs.thread.StreamingJobRunner;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.JobKiller;
import org.apache.kylin.streaming.util.MetaInfoUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public class StreamingScheduler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingScheduler.class);
    private AtomicBoolean initialized = new AtomicBoolean(false);
    private AtomicBoolean hasStarted = new AtomicBoolean(false);
    private JobContext jobContext = JobContextUtil.getJobContext((KylinConfig)KylinConfig.getInstanceFromEnv());
    private final Map<String, JdbcJobLock> streamingJobLockCache = new ConcurrentHashMap<String, JdbcJobLock>();
    private ExecutorService jobPool;
    private Map<String, StreamingJobRunner> runnerMap = Maps.newHashMap();
    private Map<String, AbstractMap.SimpleEntry<AtomicInteger, AtomicInteger>> retryMap = Maps.newHashMap();
    private static final Map<String, StreamingScheduler> INSTANCE_MAP = Maps.newConcurrentMap();
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private static final List<JobStatusEnum> STARTABLE_STATUS_LIST = Arrays.asList(JobStatusEnum.ERROR, JobStatusEnum.STOPPED, JobStatusEnum.NEW, JobStatusEnum.LAUNCHING_ERROR);
    private static StreamingJobStatusWatcher jobStatusUpdater = new StreamingJobStatusWatcher();

    public StreamingScheduler() {
        this.init();
    }

    public static synchronized StreamingScheduler getInstance() {
        return (StreamingScheduler)Singletons.getInstance(StreamingScheduler.class);
    }

    public synchronized void init() {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        if (!config.isJobNode() && !config.isDataLoadingNode()) {
            log.info("server mode: {}, no need to run job scheduler", (Object)config.getServerMode());
            return;
        }
        if (!UnitOfWork.isAlreadyInTransaction()) {
            log.info("Initializing Job Engine ....");
        }
        if (!this.initialized.compareAndSet(false, true)) {
            return;
        }
        if (config.isStreamingEnabled()) {
            int maxPoolSize = config.getNodeMaxStreamingConcurrentJobLimit();
            BasicThreadFactory executorThreadFactory = new BasicThreadFactory.Builder().namingPattern("StreamingJobWorker-%d").uncaughtExceptionHandler((t, e) -> {
                log.error("Something wrong happened when building threadFactory of streaming.", e);
                throw new IllegalStateException(e);
            }).build();
            this.jobPool = new ThreadPoolExecutor(maxPoolSize, maxPoolSize * 2, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), (ThreadFactory)executorThreadFactory);
            log.debug("New StreamingScheduler created : {}", (Object)System.identityHashCode(this));
            this.scheduledExecutorService.scheduleWithFixedDelay(this::retryJob, 5L, 1L, TimeUnit.MINUTES);
            jobStatusUpdater.schedule();
        }
        this.resumeJobs(config);
        this.hasStarted.set(true);
    }

    public synchronized void submitJob(String project, String modelId, JobTypeEnum jobType) {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        if (!config.isStreamingEnabled()) {
            return;
        }
        String jobId = StreamingUtils.getJobId((String)modelId, (String)jobType.name());
        JdbcJobLock jobLock = this.tryJobLock(jobId, project);
        if (jobLock == null) {
            throw new IllegalStateException(String.format(Locale.ROOT, "Job %s is locked by other node.", jobId));
        }
        try {
            StreamingJobMeta jobMeta = StreamingJobManager.getInstance(config, project).getStreamingJobByUuid(jobId);
            this.checkJobStartStatus(jobMeta, jobId);
            JobKiller.killProcess(jobMeta);
            this.killYarnApplication(project, jobId, modelId);
            Predicate predicate = item -> (item.getStatus() == SegmentStatusEnum.NEW || item.getStorageBytesSize() == 0L) && item.getAdditionalInfo() != null;
            if (JobTypeEnum.STREAMING_BUILD == jobType) {
                this.deleteBrokenSegment(project, modelId, (Predicate<NDataSegment>)((Predicate)item -> predicate.apply(item) && !item.getAdditionalInfo().containsKey("file_layer")));
            } else if (JobTypeEnum.STREAMING_MERGE == jobType) {
                this.deleteBrokenSegment(project, modelId, (Predicate<NDataSegment>)((Predicate)item -> predicate.apply(item) && item.getAdditionalInfo().containsKey("file_layer")));
            }
            MetaInfoUpdater.updateJobState(project, jobId, JobStatusEnum.STARTING);
            StreamingJobRunner jobRunner = new StreamingJobRunner(project, modelId, jobType);
            this.runnerMap.put(jobId, jobRunner);
            this.jobPool.execute(jobRunner);
            if (!StreamingUtils.isJobOnCluster((KylinConfig)config)) {
                MetaInfoUpdater.updateJobState(project, jobId, Sets.newHashSet((Object[])new JobStatusEnum[]{JobStatusEnum.RUNNING, JobStatusEnum.ERROR}), JobStatusEnum.RUNNING);
            }
        }
        catch (Exception e) {
            log.error("Submit streaming job failed, jobId: {}", (Object)jobId, (Object)e);
            throw e;
        }
        finally {
            try {
                jobLock.tryRelease();
            }
            catch (LockException ex) {
                log.error("Release streaming job lock failed.", (Throwable)ex);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private JdbcJobLock tryJobLock(String jobId, String project) {
        JdbcJobLock jobLock = this.streamingJobLockCache.computeIfAbsent(jobId, id -> new JdbcJobLock(jobId, this.jobContext.getServerNode(), this.jobContext.getKylinConfig().getJobSchedulerJobRenewalSec(), this.jobContext.getKylinConfig().getJobSchedulerJobRenewalRatio(), this.jobContext.getLockClient(), (LockAcquireListener)new StreamingJobLockListener(jobId, this.streamingJobLockCache)));
        if (!jobLock.isLocked()) {
            if (this.jobContext.getJobLockMapper().selectByJobId(jobId) == null) {
                this.jobContext.getJobLockMapper().insertSelective(new JobLock(jobId, project, 3, JobLock.JobTypeEnum.STREAMING));
            }
            JdbcJobLock jdbcJobLock = jobLock;
            synchronized (jdbcJobLock) {
                try {
                    if (!jobLock.isLocked() && !jobLock.tryAcquire()) {
                        log.info("Acquire job lock failed, jobId: {}", (Object)jobId);
                        return null;
                    }
                }
                catch (LockException e) {
                    log.info("Acquire job lock failed, jobId: {}, error: {}", (Object)jobId, (Object)e);
                    return null;
                }
            }
        }
        return jobLock;
    }

    public synchronized void stopJob(String project, String modelId, JobTypeEnum jobType) {
        String jobId = StreamingUtils.getJobId((String)modelId, (String)jobType.name());
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        boolean existed = this.applicationExisted(jobId);
        if (existed) {
            StreamingJobManager jobMgr = StreamingJobManager.getInstance(config, project);
            JobStatusEnum status = jobMgr.getStreamingJobByUuid(jobId).getCurrentStatus();
            if (JobStatusEnum.ERROR == status || JobStatusEnum.STOPPED == status) {
                return;
            }
            MetaInfoUpdater.updateJobState(project, jobId, JobStatusEnum.STOPPING);
            this.doStop(project, modelId, jobType);
        } else {
            this.doStop(project, modelId, jobType);
            if (StreamingUtils.isJobOnCluster((KylinConfig)config)) {
                MetaInfoUpdater.updateJobState(project, jobId, Sets.newHashSet((Object[])new JobStatusEnum[]{JobStatusEnum.STOPPED, JobStatusEnum.ERROR}), JobStatusEnum.ERROR);
            } else {
                MetaInfoUpdater.updateJobState(project, jobId, Sets.newHashSet((Object[])new JobStatusEnum[]{JobStatusEnum.STOPPED, JobStatusEnum.ERROR}), JobStatusEnum.STOPPED);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doStop(String project, String modelId, JobTypeEnum jobType) {
        String jobId = StreamingUtils.getJobId((String)modelId, (String)jobType.name());
        StreamingJobRunner runner = this.runnerMap.get(jobId);
        Map<String, StreamingJobRunner> map = this.runnerMap;
        synchronized (map) {
            if (Objects.isNull(runner)) {
                runner = new StreamingJobRunner(project, modelId, jobType);
                runner.init();
                this.runnerMap.put(jobId, runner);
            }
        }
        runner.stop();
    }

    public void forceShutdown() {
        log.info("Shutting down DefaultScheduler ....");
        this.releaseResources();
        ExecutorServiceUtil.forceShutdown((ExecutorService)this.scheduledExecutorService);
        ExecutorServiceUtil.forceShutdown((ExecutorService)this.jobPool);
    }

    private void releaseResources() {
        this.initialized.set(false);
        this.hasStarted.set(false);
        INSTANCE_MAP.clear();
    }

    private void checkJobStartStatus(StreamingJobMeta jobMeta, String jobId) {
        if (!STARTABLE_STATUS_LIST.contains(jobMeta.getCurrentStatus())) {
            throw new KylinException((ErrorCodeSupplier)ServerErrorCode.JOB_START_FAILURE, jobId);
        }
    }

    public void retryJob() {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        List prjList = ResourceGroupManager.getInstance((KylinConfig)config).listProjectWithPermission();
        prjList.forEach(project -> {
            StreamingJobManager mgr = StreamingJobManager.getInstance(config, project);
            List<StreamingJobMeta> jobMetaList = mgr.listAllStreamingJobMeta();
            List<StreamingJobMeta> retryJobMetaList = jobMetaList.stream().filter(meta -> "true".equals(meta.getParams().getOrDefault("kylin.streaming.job-retry-enabled", config.getStreamingJobRetryEnabled()))).collect(Collectors.toList());
            retryJobMetaList.forEach(meta -> {
                JobStatusEnum status = meta.getCurrentStatus();
                String modelId = meta.getModelId();
                String jobId = StreamingUtils.getJobId((String)modelId, (String)meta.getJobType().name());
                if (this.retryMap.containsKey(jobId) || status == JobStatusEnum.ERROR) {
                    boolean canRestart;
                    boolean bl = canRestart = !this.applicationExisted(jobId);
                    if (canRestart) {
                        if (!this.retryMap.containsKey(jobId)) {
                            if (status == JobStatusEnum.ERROR) {
                                this.retryMap.put(jobId, new AbstractMap.SimpleEntry<AtomicInteger, AtomicInteger>(new AtomicInteger(config.getStreamingJobRetryInterval()), new AtomicInteger(1)));
                            }
                        } else {
                            int targetCnt = this.retryMap.get(jobId).getKey().get();
                            int currCnt = this.retryMap.get(jobId).getValue().get();
                            log.debug("targetCnt=" + targetCnt + ",currCnt=" + currCnt + " jobId=" + jobId);
                            if (targetCnt <= config.getStreamingJobMaxRetryInterval()) {
                                this.retryMap.get(jobId).getValue().incrementAndGet();
                                if (targetCnt == currCnt && status == JobStatusEnum.ERROR) {
                                    log.info("begin to restart job:" + modelId + "_" + meta.getJobType());
                                    this.restartJob(config, (StreamingJobMeta)((Object)((Object)meta)), jobId, targetCnt);
                                }
                            }
                        }
                    } else if (status == JobStatusEnum.RUNNING && this.retryMap.containsKey(jobId)) {
                        log.debug("remove jobId=" + jobId);
                        this.retryMap.remove(jobId);
                    }
                }
            });
        });
    }

    private void restartJob(KylinConfig config, StreamingJobMeta meta, String jobId, int targetCnt) {
        try {
            this.submitJob(meta.getProject(), meta.getModelId(), meta.getJobType());
            if (targetCnt < config.getStreamingJobMaxRetryInterval()) {
                this.retryMap.get(jobId).getKey().addAndGet(config.getStreamingJobRetryInterval());
            }
            this.retryMap.get(jobId).getValue().set(0);
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    private void deleteBrokenSegment(String project, String dataflowId, Predicate<NDataSegment> predicate) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)project);
            NDataflow df = dfMgr.getDataflow(dataflowId);
            Segments segments = df.getSegments();
            List<NDataSegment> toRemoveSegs = segments.stream().filter(item -> predicate.apply(item)).collect(Collectors.toList());
            if (!toRemoveSegs.isEmpty()) {
                NDataflowUpdate dfUpdate = new NDataflowUpdate(dataflowId);
                dfUpdate.setToRemoveSegs(toRemoveSegs.toArray(new NDataSegment[0]));
                dfMgr.updateDataflow(dfUpdate);
            }
            return 0;
        }, (String)project);
    }

    public boolean applicationExisted(String jobId) {
        return JobKiller.applicationExisted(jobId);
    }

    public void killYarnApplication(String project, String jobId, String modelId) {
        boolean isExists = this.applicationExisted(jobId);
        if (isExists) {
            JobKiller.killApplication(jobId);
            isExists = this.applicationExisted(jobId);
            if (isExists) {
                String model = NDataModelManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)project).getDataModelDesc(modelId).getAlias();
                throw new KylinException((ErrorCodeSupplier)ServerErrorCode.REPEATED_START_ERROR, String.format(Locale.ROOT, MsgPicker.getMsg().getJobStartFailure(), model));
            }
        }
    }

    private void killJob(String project, String modelId, JobTypeEnum jobTypeEnum) {
        this.killJob(project, modelId, jobTypeEnum, JobStatusEnum.ERROR);
    }

    public void killJob(String project, String modelId, JobTypeEnum jobTypeEnum, JobStatusEnum status) {
        this.killJob(project, StreamingUtils.getJobId((String)modelId, (String)jobTypeEnum.name()), status);
    }

    private void killJob(String project, String jobId, JobStatusEnum status) {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        StreamingJobMeta jobMeta = StreamingJobManager.getInstance(config, project).getStreamingJobByUuid(jobId);
        JobKiller.killProcess(jobMeta);
        JobKiller.killApplication(jobId);
        MetaInfoUpdater.updateJobState(project, jobId, status);
    }

    private void resumeJobs(KylinConfig config) {
        List prjList = ResourceGroupManager.getInstance((KylinConfig)config).listProjectWithPermission();
        prjList.forEach(project -> {
            StreamingJobManager mgr = StreamingJobManager.getInstance(config, project);
            List<StreamingJobMeta> jobMetaList = mgr.listAllStreamingJobMeta();
            if (CollectionUtils.isEmpty(jobMetaList)) {
                return;
            }
            List<StreamingJobMeta> retryJobMetaList = jobMetaList.stream().filter(meta -> JobStatusEnum.STARTING == meta.getCurrentStatus() || JobStatusEnum.STOPPING == meta.getCurrentStatus() || JobStatusEnum.RUNNING == meta.getCurrentStatus() || JobStatusEnum.ERROR == meta.getCurrentStatus()).collect(Collectors.toList());
            retryJobMetaList.forEach(meta -> {
                JobTypeEnum jobType;
                String modelId = meta.getModelId();
                String jobId = StreamingUtils.getJobId((String)modelId, (String)(jobType = meta.getJobType()).name());
                JdbcJobLock jobLock = this.tryJobLock(jobId, (String)project);
                if (jobLock == null) {
                    log.info("Skip to resume job : {}, because it is locked by other node.", (Object)jobId);
                    return;
                }
                try {
                    if (meta.isSkipListener()) {
                        this.skipJobListener((String)project, StreamingUtils.getJobId((String)modelId, (String)jobType.name()), false);
                    }
                    if (JobStatusEnum.RUNNING == meta.getCurrentStatus() || JobStatusEnum.STARTING == meta.getCurrentStatus()) {
                        this.killJob((String)project, meta.getModelId(), meta.getJobType(), JobStatusEnum.STOPPED);
                        this.submitJob((String)project, modelId, jobType);
                    } else {
                        this.killJob((String)project, meta.getModelId(), meta.getJobType());
                        jobLock.tryRelease();
                    }
                }
                catch (Exception e) {
                    log.error("Error when resume job : {}", (Object)jobId, (Object)e);
                }
                finally {
                    try {
                        jobLock.tryRelease();
                    }
                    catch (LockException ex) {
                        log.error("Release streaming job lock failed", (Throwable)ex);
                    }
                }
            });
        });
    }

    public void skipJobListener(String project, String uuid, boolean skip) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            KylinConfig config = KylinConfig.getInstanceFromEnv();
            StreamingJobManager mgr = StreamingJobManager.getInstance(config, project);
            mgr.updateStreamingJob(uuid, copyForWrite -> {
                if (copyForWrite != null) {
                    copyForWrite.setSkipListener(skip);
                }
            });
            return null;
        }, (String)project);
    }

    @Generated
    public AtomicBoolean getInitialized() {
        return this.initialized;
    }

    @Generated
    public AtomicBoolean getHasStarted() {
        return this.hasStarted;
    }

    private static class StreamingJobLockListener
    implements LockAcquireListener {
        private String jobId;
        private Map<String, JdbcJobLock> streamingJobLockCache;

        public StreamingJobLockListener(String jobId, Map<String, JdbcJobLock> streamingJobLockCache) {
            this.jobId = jobId;
            this.streamingJobLockCache = streamingJobLockCache;
        }

        public void onSucceed() {
            this.streamingJobLockCache.get(this.jobId).setLocked(true);
        }

        public void onFailed() {
            this.streamingJobLockCache.get(this.jobId).setLocked(false);
        }
    }
}

