/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.rest.scheduler;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.logging.SetLogCategory;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.job.snapshot.SnapshotJobUtils;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.rest.handler.KapNoOpResponseErrorHandler;
import org.apache.kylin.rest.scheduler.BuildSnapshotRunnable;
import org.apache.kylin.rest.scheduler.CheckSourceTableResult;
import org.apache.kylin.rest.scheduler.CheckSourceTableRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.ResponseErrorHandler;
import org.springframework.web.client.RestTemplate;

public class AutoRefreshSnapshotRunner
implements Runnable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AutoRefreshSnapshotRunner.class);
    private static final String SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE = "Project[%s] Save View Mapping Failed";
    private static final Map<String, AutoRefreshSnapshotRunner> INSTANCE_MAP = Maps.newConcurrentMap();
    private ExecutorService jobPool;
    private KylinConfig projectConfig;
    private Queue<CheckSourceTableResult> checkSourceTableQueue = new LinkedBlockingQueue<CheckSourceTableResult>();
    private Map<Future<String>, Long> checkSourceTableFutures = Maps.newConcurrentMap();
    private final String project;
    private RestTemplate restTemplate;
    private Map<String, List<TableDesc>> sourceTableSnapshotMapping = Maps.newHashMap();
    private Map<String, AtomicInteger> buildSnapshotCount = Maps.newConcurrentMap();

    public static synchronized AutoRefreshSnapshotRunner getInstanceByProject(String project) {
        return INSTANCE_MAP.get(project);
    }

    public static synchronized AutoRefreshSnapshotRunner getInstance(String project) {
        return INSTANCE_MAP.computeIfAbsent(project, key -> {
            KylinConfigExt projectConfig = NProjectManager.getInstance((KylinConfig)KylinConfig.readSystemKylinConfig()).getProject(project).getConfig();
            return new AutoRefreshSnapshotRunner((KylinConfig)projectConfig, project);
        });
    }

    private AutoRefreshSnapshotRunner(KylinConfig projectConfig, String project) {
        Preconditions.checkNotNull((Object)project);
        if (INSTANCE_MAP.containsKey(project)) {
            throw new IllegalStateException("DefaultScheduler for project " + project + " has been initiated. Use getInstance() instead.");
        }
        this.project = project;
        this.init(projectConfig, project);
        log.debug("New AutoRefreshSnapshotRunner created by project '{}': {}", (Object)project, (Object)System.identityHashCode(this));
    }

    public void init(KylinConfig projectConfig, String project) {
        int corePoolSize = projectConfig.getSnapshotAutoRefreshMaxConcurrentJobLimit();
        this.jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory("AutoRefreshSnapshotWorker(project:" + project + ")"));
        this.projectConfig = projectConfig;
        this.restTemplate = new RestTemplate();
        this.restTemplate.setErrorHandler((ResponseErrorHandler)new KapNoOpResponseErrorHandler());
        log.info("AutoRefreshSnapshotRunner init project[{}] job pool size: {}", (Object)project, (Object)corePoolSize);
    }

    public void doRun() {
        try (SetLogCategory ignored = new SetLogCategory("schedule");){
            log.info("Project[{}] start check and refresh snapshot", (Object)this.project);
            if (log.isDebugEnabled()) {
                ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor)this.jobPool;
                log.debug("job pool params: PoolSize[{}] CorePoolSize[{}] ActiveCount[{}] MaximumPoolSize[{}]", new Object[]{poolExecutor.getPoolSize(), poolExecutor.getCorePoolSize(), poolExecutor.getActiveCount(), poolExecutor.getMaximumPoolSize()});
            }
            this.saveSnapshotViewMapping(this.project, this.restTemplate);
            List tables = SnapshotJobUtils.getSnapshotTables((KylinConfig)this.projectConfig, (String)this.project);
            Map<String, Set<String>> viewTableMapping = this.readViewTableMapping();
            this.sourceTableSnapshotMapping = this.getSourceTableSnapshotMapping(tables, viewTableMapping);
            Set<String> allSourceTable = this.sourceTableSnapshotMapping.keySet();
            this.checkSourceTable(allSourceTable);
            this.waitCheckSourceTableTaskDone();
            log.info("Project[{}] stop check and refresh snapshot", (Object)this.project);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new KylinRuntimeException((Throwable)ie);
        }
        catch (Exception e) {
            throw new KylinRuntimeException((Throwable)e);
        }
        finally {
            this.checkSourceTableQueue = new LinkedBlockingQueue<CheckSourceTableResult>();
            this.cancelFuture(this.checkSourceTableFutures);
            this.checkSourceTableFutures = Maps.newConcurrentMap();
            this.sourceTableSnapshotMapping = Maps.newHashMap();
            this.buildSnapshotCount = Maps.newConcurrentMap();
        }
    }

    private void checkJobPool() {
        this.projectConfig = NProjectManager.getInstance((KylinConfig)KylinConfig.readSystemKylinConfig()).getProject(this.project).getConfig();
        ThreadPoolExecutor pool = (ThreadPoolExecutor)this.getJobPool();
        int corePoolSize = pool.getCorePoolSize();
        int poolSizeFromConfig = this.projectConfig.getSnapshotAutoRefreshMaxConcurrentJobLimit();
        if (poolSizeFromConfig != corePoolSize) {
            pool.setCorePoolSize(poolSizeFromConfig);
            pool.setMaximumPoolSize(poolSizeFromConfig);
            log.info("update AutoRefreshSnapshotRunner job pool size : {} old pool size : {}", (Object)poolSizeFromConfig, (Object)corePoolSize);
        }
    }

    public void cancelFuture(Map<Future<String>, Long> taskFutures) {
        taskFutures.keySet().forEach(future -> {
            if (!future.isDone()) {
                future.cancel(true);
            }
        });
    }

    public Map<String, Set<String>> readViewTableMapping() {
        HashMap result;
        block14: {
            FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
            Path viewMappingPath = new Path(this.projectConfig.getSnapshotAutoRefreshDir(this.project) + "view_mapping");
            result = Maps.newHashMap();
            try {
                if (!fileSystem.exists(viewMappingPath)) break block14;
                try (FSDataInputStream inputStream = fileSystem.open(viewMappingPath);){
                    result.putAll((Map)JsonUtil.readValue((InputStream)inputStream, (TypeReference)new TypeReference<Map<String, Set<String>>>(){}));
                }
            }
            catch (IOException e) {
                log.error("read viewMapping path[{}] has error", (Object)viewMappingPath, (Object)e);
            }
        }
        return result;
    }

    public Map<String, List<TableDesc>> getSourceTableSnapshotMapping(List<TableDesc> tables, Map<String, Set<String>> viewTableMapping) {
        HashMap result = Maps.newHashMap();
        NTableMetadataManager tableManager = NTableMetadataManager.getInstance((KylinConfig)this.projectConfig, (String)this.project);
        for (Map.Entry<String, Set<String>> entry : viewTableMapping.entrySet()) {
            String snapshotTableIdentity = entry.getKey();
            for (String table : entry.getValue()) {
                TableDesc tableDesc = tableManager.getTableDesc(snapshotTableIdentity);
                List snapshots = result.getOrDefault(table, Lists.newArrayList());
                if (tableDesc != null) {
                    snapshots.add(tableDesc);
                }
                result.put(table, snapshots.stream().distinct().collect(Collectors.toList()));
            }
        }
        for (TableDesc tableDesc : tables) {
            if (tableDesc.isView()) continue;
            String source = tableDesc.getIdentity().toLowerCase(Locale.ROOT);
            List snapshots = result.getOrDefault(source, Lists.newArrayList());
            snapshots.add(tableDesc);
            result.put(source, snapshots.stream().distinct().collect(Collectors.toList()));
        }
        return result;
    }

    public void saveSnapshotViewMapping(String project, RestTemplate restTemplate) {
        try {
            String url = String.format(Locale.ROOT, "http://%s/kylin/api/snapshots/view_mapping", this.projectConfig.getServerAddress());
            HashMap req = Maps.newHashMap();
            req.put("project", project);
            log.debug("checkTableNeedRefresh request: {}", (Object)req);
            HttpHeaders httpHeaders = new HttpHeaders();
            httpHeaders.add("Content-Type", "application/vnd.apache.kylin-v4-public+json");
            httpHeaders.add("Timeout", "");
            ResponseEntity exchange = restTemplate.exchange(url, HttpMethod.POST, new HttpEntity((Object)JsonUtil.writeValueAsBytes((Object)req), (MultiValueMap)httpHeaders), String.class, new Object[0]);
            int responseStatus = exchange.getStatusCodeValue();
            if (responseStatus != 200) {
                throw new KylinRuntimeException(String.format(Locale.ROOT, SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE, project));
            }
            String responseBody = (String)Optional.ofNullable(exchange.getBody()).orElse("");
            RestResponse response = (RestResponse)JsonUtil.readValue((String)responseBody, (TypeReference)new TypeReference<RestResponse<Boolean>>(){});
            if (!StringUtils.equals((CharSequence)response.getCode(), (CharSequence)"000")) {
                throw new KylinRuntimeException(String.format(Locale.ROOT, SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE, project));
            }
            if (Boolean.FALSE.equals(response.getData())) {
                throw new KylinRuntimeException(String.format(Locale.ROOT, SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE, project));
            }
        }
        catch (IOException e) {
            log.error(e.getMessage(), (Throwable)e);
            throw new KylinRuntimeException(String.format(Locale.ROOT, SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE, project), (Throwable)e);
        }
    }

    public void checkSourceTable(Set<String> allSourceTable) {
        for (String table : allSourceTable) {
            CheckSourceTableRunnable runnable = new CheckSourceTableRunnable();
            runnable.setProject(this.project);
            runnable.setConfig(this.projectConfig);
            runnable.setTableIdentity(table);
            runnable.setRestTemplate(this.restTemplate);
            runnable.setCheckSourceTableQueue(this.checkSourceTableQueue);
            this.sourceTableSnapshotMapping.get(table).stream().filter(tableDesc -> StringUtils.equalsIgnoreCase((CharSequence)table, (CharSequence)tableDesc.getIdentity())).findFirst().ifPresent(tableDesc -> runnable.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol()));
            Future<String> submit = this.jobPool.submit(runnable, "success");
            this.checkSourceTableFutures.put(submit, System.currentTimeMillis());
        }
    }

    public void waitCheckSourceTableTaskDone() throws InterruptedException {
        while (true) {
            if (this.checkSourceTableQueue.peek() != null) {
                CheckSourceTableResult checkResult = this.checkSourceTableQueue.poll();
                if (!checkResult.getNeedRefresh().booleanValue()) continue;
                this.buildSnapshot(checkResult);
                continue;
            }
            long doneCount = this.checkSourceTableFutures.keySet().stream().filter(Future::isDone).count();
            if ((long)this.checkSourceTableFutures.size() == doneCount) break;
            this.cancelTimeoutFuture(this.checkSourceTableFutures);
            TimeUnit.SECONDS.sleep(10L);
        }
    }

    public void cancelTimeoutFuture(Map<Future<String>, Long> futures) {
        for (Map.Entry<Future<String>, Long> entry : futures.entrySet()) {
            long runningTime;
            Future<String> future = entry.getKey();
            if (future.isDone() || (runningTime = System.currentTimeMillis() - entry.getValue()) <= this.projectConfig.getSnapshotAutoRefreshTaskTimeout()) continue;
            log.debug("cancel timeout future with timeout setting[{}]", (Object)this.projectConfig.getSnapshotAutoRefreshTaskTimeout());
            future.cancel(true);
        }
    }

    public void buildSnapshot(CheckSourceTableResult result) {
        List<TableDesc> needBuildSnapshots = this.sourceTableSnapshotMapping.get(result.getTableIdentity());
        for (TableDesc tableDesc : needBuildSnapshots) {
            AtomicInteger sourceTableCount = this.buildSnapshotCount.getOrDefault(tableDesc.getIdentity(), new AtomicInteger(0));
            log.info("buildSnapshotCount is [{}], tableIdentity is [{}]", (Object)sourceTableCount, (Object)tableDesc.getIdentity());
            if (sourceTableCount.getAndIncrement() == 0) {
                BuildSnapshotRunnable runnable = new BuildSnapshotRunnable();
                runnable.setProject(this.project);
                runnable.setConfig(this.projectConfig);
                runnable.setRestTemplate(this.restTemplate);
                runnable.setNeedRefresh(result.getNeedRefresh());
                runnable.setNeedRefreshPartitionsValue(result.getNeedRefreshPartitionsValue());
                runnable.setTableIdentity(tableDesc.getIdentity());
                runnable.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol());
                runnable.run();
            }
            this.buildSnapshotCount.put(tableDesc.getIdentity(), sourceTableCount);
        }
    }

    public static synchronized void shutdown(String project) {
        AutoRefreshSnapshotRunner refreshSnapshotRunner = AutoRefreshSnapshotRunner.getInstanceByProject(project);
        if (null != refreshSnapshotRunner) {
            refreshSnapshotRunner.innerShutdown();
            log.info("update snapshot automatic refresh fetch pool size success");
        }
    }

    public void innerShutdown() {
        if (Thread.currentThread().isInterrupted()) {
            log.warn("shutdown->current thread is interrupted,{}", (Object)Thread.currentThread().getName());
            throw new InterruptedException();
        }
        log.info("Shutting down AutoRefreshSnapshotRunner for project {} ....", (Object)this.project);
        if (null != this.jobPool) {
            ExecutorServiceUtil.shutdownGracefully((ExecutorService)this.jobPool, (int)60);
        }
        INSTANCE_MAP.remove(this.project);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try (SetLogCategory ignored = new SetLogCategory("schedule");){
            this.checkJobPool();
            this.saveMarkFile();
            this.doRun();
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        finally {
            this.deleteMarkFile();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runWhenSchedulerInit() {
        try (SetLogCategory ignored = new SetLogCategory("schedule");){
            this.doRun();
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        finally {
            this.deleteMarkFile();
        }
    }

    public void saveMarkFile() {
        Path markFilePath = new Path(this.projectConfig.getSnapshotAutoRefreshDir(this.project) + "_mark");
        FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
        try (FSDataOutputStream out = fileSystem.create(markFilePath, true);){
            out.write(new byte[0]);
        }
        catch (IOException e) {
            log.error("overwrite mark file [{}] failed!", (Object)markFilePath, (Object)e);
        }
    }

    public void deleteMarkFile() {
        Path markFilePath = new Path(this.projectConfig.getSnapshotAutoRefreshDir(this.project) + "_mark");
        FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
        try {
            if (fileSystem.exists(markFilePath)) {
                fileSystem.delete(markFilePath, true);
            }
        }
        catch (IOException e) {
            log.error("delete mark file [{}] failed!", (Object)markFilePath, (Object)e);
        }
    }

    @Generated
    public void setJobPool(ExecutorService jobPool) {
        this.jobPool = jobPool;
    }

    @Generated
    public ExecutorService getJobPool() {
        return this.jobPool;
    }

    @Generated
    public void setProjectConfig(KylinConfig projectConfig) {
        this.projectConfig = projectConfig;
    }

    @Generated
    public KylinConfig getProjectConfig() {
        return this.projectConfig;
    }

    @Generated
    public Queue<CheckSourceTableResult> getCheckSourceTableQueue() {
        return this.checkSourceTableQueue;
    }

    @Generated
    public Map<Future<String>, Long> getCheckSourceTableFutures() {
        return this.checkSourceTableFutures;
    }

    @Generated
    public String getProject() {
        return this.project;
    }

    @Generated
    public Map<String, List<TableDesc>> getSourceTableSnapshotMapping() {
        return this.sourceTableSnapshotMapping;
    }

    @Generated
    public Map<String, AtomicInteger> getBuildSnapshotCount() {
        return this.buildSnapshotCount;
    }
}

