/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.checkpoint.storage.hdfs;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.AbstractConfiguration;
import org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.FileConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HdfsStorage
extends AbstractCheckpointStorage {
    private static final Logger log = LoggerFactory.getLogger(HdfsStorage.class);
    public FileSystem fs;
    private static final String STORAGE_TMP_SUFFIX = "tmp";
    private static final String STORAGE_TYPE_KEY = "storage.type";

    public HdfsStorage(Map<String, String> configuration) throws CheckpointStorageException {
        this.initStorage(configuration);
    }

    public void initStorage(Map<String, String> configuration) throws CheckpointStorageException {
        if (StringUtils.isNotBlank((CharSequence)configuration.get("namespace"))) {
            this.setStorageNameSpace(configuration.get("namespace"));
            configuration.remove("namespace");
        }
        Configuration hadoopConf = this.getConfiguration(configuration);
        try {
            this.fs = FileSystem.get((Configuration)hadoopConf);
        }
        catch (IOException e) {
            throw new CheckpointStorageException("Failed to get file system", (Throwable)e);
        }
    }

    private Configuration getConfiguration(Map<String, String> config) throws CheckpointStorageException {
        String storageType = config.getOrDefault(STORAGE_TYPE_KEY, FileConfiguration.LOCAL.toString());
        config.remove(STORAGE_TYPE_KEY);
        AbstractConfiguration configuration = FileConfiguration.valueOf(storageType.toUpperCase()).getConfiguration(storageType);
        return configuration.buildConfiguration(config);
    }

    public String storeCheckPoint(PipelineState state) throws CheckpointStorageException {
        byte[] datas;
        try {
            datas = this.serializeCheckPointData(state);
        }
        catch (IOException e) {
            throw new CheckpointStorageException(String.format("Failed to serialize checkpoint data, state: %s", state), (Throwable)e);
        }
        Path filePath = new Path(this.getStorageParentDirectory() + state.getJobId() + "/" + this.getCheckPointName(state));
        Path tmpFilePath = new Path(this.getStorageParentDirectory() + state.getJobId() + "/" + this.getCheckPointName(state) + STORAGE_TMP_SUFFIX);
        try (FSDataOutputStream out = this.fs.create(tmpFilePath, false);){
            out.write(datas);
        }
        catch (IOException e) {
            throw new CheckpointStorageException(String.format("Failed to write checkpoint data, file: %s, state: %s", tmpFilePath, state), (Throwable)e);
        }
        try {
            boolean success = this.fs.rename(tmpFilePath, filePath);
            if (!success) {
                throw new CheckpointStorageException("Failed to rename tmp file to final file");
            }
        }
        catch (IOException e) {
            throw new CheckpointStorageException("Failed to rename tmp file to final file");
        }
        finally {
            try {
                if (this.fs.exists(tmpFilePath)) {
                    this.fs.delete(tmpFilePath, false);
                }
            }
            catch (IOException ioe) {
                log.error("Failed to delete tmp file", (Throwable)ioe);
            }
        }
        return filePath.getName();
    }

    public List<PipelineState> getAllCheckpoints(String jobId) throws CheckpointStorageException {
        String path = this.getStorageParentDirectory() + jobId;
        List<String> fileNames = this.getFileNames(path);
        if (fileNames.isEmpty()) {
            log.info("No checkpoint found for this job, the job id is: " + jobId);
            return new ArrayList<PipelineState>();
        }
        ArrayList<PipelineState> states = new ArrayList<PipelineState>();
        fileNames.forEach(file -> {
            try {
                states.add(this.readPipelineState((String)file, jobId));
            }
            catch (CheckpointStorageException e) {
                log.error("Failed to read checkpoint data from file: " + file, (Throwable)e);
            }
        });
        if (states.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
        }
        return states;
    }

    public List<PipelineState> getLatestCheckpoint(String jobId) throws CheckpointStorageException {
        String path = this.getStorageParentDirectory() + jobId;
        List<String> fileNames = this.getFileNames(path);
        if (fileNames.isEmpty()) {
            log.info("No checkpoint found for this  job, the job id is: " + jobId);
            return new ArrayList<PipelineState>();
        }
        Set latestPipelineNames = this.getLatestPipelineNames(fileNames);
        ArrayList<PipelineState> latestPipelineStates = new ArrayList<PipelineState>();
        latestPipelineNames.forEach(fileName -> {
            try {
                latestPipelineStates.add(this.readPipelineState((String)fileName, jobId));
            }
            catch (CheckpointStorageException e) {
                log.error("Failed to read pipeline state for file: {}", fileName, (Object)e);
            }
        });
        if (latestPipelineStates.isEmpty()) {
            log.info("No checkpoint found for this job,  the job id:{} " + jobId);
        }
        return latestPipelineStates;
    }

    public PipelineState getLatestCheckpointByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException {
        String path = this.getStorageParentDirectory() + jobId;
        List<String> fileNames = this.getFileNames(path);
        if (fileNames.isEmpty()) {
            log.info("No checkpoint found for job, job id is: " + jobId);
            return null;
        }
        String latestFileName = this.getLatestCheckpointFileNameByJobIdAndPipelineId(fileNames, pipelineId);
        if (latestFileName == null) {
            log.info("No checkpoint found for this job, the job id is: " + jobId + ", pipeline id is: " + pipelineId);
            return null;
        }
        return this.readPipelineState(latestFileName, jobId);
    }

    public List<PipelineState> getCheckpointsByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException {
        String path = this.getStorageParentDirectory() + jobId;
        List<String> fileNames = this.getFileNames(path);
        if (fileNames.isEmpty()) {
            log.info("No checkpoint found for this job, the job id is: " + jobId);
            return new ArrayList<PipelineState>();
        }
        ArrayList<PipelineState> pipelineStates = new ArrayList<PipelineState>();
        fileNames.forEach(file -> {
            String filePipelineId = this.getPipelineIdByFileName((String)file);
            if (pipelineId.equals(filePipelineId)) {
                try {
                    pipelineStates.add(this.readPipelineState((String)file, jobId));
                }
                catch (Exception e) {
                    log.error("Failed to read checkpoint data from file " + file, (Throwable)e);
                }
            }
        });
        return pipelineStates;
    }

    public void deleteCheckpoint(String jobId) {
        String jobPath = this.getStorageParentDirectory() + jobId;
        try {
            this.fs.delete(new Path(jobPath), true);
        }
        catch (IOException e) {
            log.warn("Failed to delete checkpoint for job {}", (Object)jobId, (Object)e);
        }
    }

    public PipelineState getCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
        String path = this.getStorageParentDirectory() + jobId;
        List<String> fileNames = this.getFileNames(path);
        if (fileNames.isEmpty()) {
            log.info("No checkpoint found for this job,  the job id is: " + jobId);
            return null;
        }
        for (String fileName : fileNames) {
            if (!pipelineId.equals(this.getPipelineIdByFileName(fileName)) || !checkpointId.equals(this.getCheckpointIdByFileName(fileName))) continue;
            try {
                return this.readPipelineState(fileName, jobId);
            }
            catch (Exception e) {
                log.error("Failed to get checkpoint {} for job {}, pipeline {}", new Object[]{checkpointId, jobId, pipelineId, e});
            }
        }
        throw new CheckpointStorageException(String.format("No checkpoint found, job(%s), pipeline(%s), checkpoint(%s)", jobId, pipelineId, checkpointId));
    }

    public synchronized void deleteCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
        String path = this.getStorageParentDirectory() + jobId;
        List<String> fileNames = this.getFileNames(path);
        if (fileNames.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
        }
        fileNames.forEach(fileName -> {
            if (pipelineId.equals(this.getPipelineIdByFileName((String)fileName)) && checkpointId.equals(this.getCheckpointIdByFileName((String)fileName))) {
                try {
                    this.fs.delete(new Path(path + "/" + fileName), false);
                }
                catch (Exception e) {
                    log.error("Failed to delete checkpoint {} for job {}, pipeline {}", new Object[]{checkpointId, jobId, pipelineId, e});
                }
            }
        });
    }

    public void deleteCheckpoint(String jobId, String pipelineId, List<String> checkpointIdList) throws CheckpointStorageException {
        String path = this.getStorageParentDirectory() + jobId;
        List<String> fileNames = this.getFileNames(path);
        if (fileNames.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
        }
        fileNames.forEach(fileName -> {
            String checkpointIdByFileName = this.getCheckpointIdByFileName((String)fileName);
            if (pipelineId.equals(this.getPipelineIdByFileName((String)fileName)) && checkpointIdList.contains(checkpointIdByFileName)) {
                try {
                    this.fs.delete(new Path(path + "/" + fileName), false);
                }
                catch (Exception e) {
                    log.error("Failed to delete checkpoint {} for job {}, pipeline {}", new Object[]{checkpointIdByFileName, jobId, pipelineId, e});
                }
            }
        });
    }

    public List<String> getFileNames(String path) throws CheckpointStorageException {
        try {
            Path parentPath = new Path(path);
            if (!this.fs.exists(parentPath)) {
                log.info("Path " + path + " is not a directory");
                return new ArrayList<String>();
            }
            FileStatus[] fileStatus = this.fs.listStatus(parentPath, path1 -> path1.getName().endsWith("ser"));
            ArrayList<String> fileNames = new ArrayList<String>();
            for (FileStatus status : fileStatus) {
                fileNames.add(status.getPath().getName());
            }
            return fileNames;
        }
        catch (IOException e) {
            throw new CheckpointStorageException("Failed to list files from names" + path, (Throwable)e);
        }
    }

    /*
     * Enabled aggressive exception aggregation
     */
    private PipelineState readPipelineState(String fileName, String jobId) throws CheckpointStorageException {
        fileName = this.getStorageParentDirectory() + jobId + "/" + fileName;
        try (FSDataInputStream in = this.fs.open(new Path(fileName));){
            PipelineState pipelineState;
            try (ByteArrayOutputStream stream = new ByteArrayOutputStream();){
                IOUtils.copyBytes((InputStream)in, (OutputStream)stream, (int)1024);
                byte[] bytes = stream.toByteArray();
                pipelineState = this.deserializeCheckPointData(bytes);
            }
            return pipelineState;
        }
        catch (IOException e) {
            throw new CheckpointStorageException(String.format("Failed to read checkpoint data, file name is %s,job id is %s", fileName, jobId), (Throwable)e);
        }
    }
}

