/*
 * Decompiled with CFR 0.152.
 */
package io.milvus.bulkwriter;

import com.google.common.collect.Lists;
import com.google.gson.JsonObject;
import io.milvus.bulkwriter.BulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.writer.FormatFileWriter;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalBulkWriter
extends BulkWriter {
    private static final Logger logger = LoggerFactory.getLogger(LocalBulkWriter.class);
    private Map<String, Thread> workingThread;
    private ReentrantLock workingThreadLock = new ReentrantLock();
    private List<List<String>> localFiles;

    public LocalBulkWriter(LocalBulkWriterParam bulkWriterParam) throws IOException {
        super(bulkWriterParam.getCollectionSchema(), bulkWriterParam.getChunkSize(), bulkWriterParam.getFileType(), bulkWriterParam.getLocalPath(), bulkWriterParam.getConfig());
        this.workingThread = new HashMap<String, Thread>();
        this.localFiles = Lists.newArrayList();
    }

    protected LocalBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema, long chunkSize, BulkFileType fileType, String localPath, Map<String, Object> config) throws IOException {
        super(collectionSchema, chunkSize, fileType, localPath, config);
        this.workingThread = new HashMap<String, Thread>();
        this.localFiles = Lists.newArrayList();
    }

    @Override
    public void appendRow(JsonObject rowData) throws IOException, InterruptedException {
        super.appendRow(rowData);
    }

    @Override
    protected void callBackIfCommitReady(List<String> filePaths) throws InterruptedException {
        this.workingThreadLock.lock();
        this.callBack(true, filePaths);
        this.workingThreadLock.unlock();
    }

    public void commit(boolean async) throws InterruptedException {
        List<String> filePath = this.commitIfFileReady(false);
        this.callBack(async, filePath);
    }

    @Override
    protected List<String> commitIfFileReady(boolean createNewFile) {
        if (super.getTotalRowCount() <= 0L) {
            String msg = "current_file_total_row_count less than 0, no need to generator a file";
            logger.info(msg);
            return null;
        }
        String filePath = super.getFileWriter().getFilePath();
        String msg = String.format("Prepare to commit file:%s, current_file_total_row_count: %s, current_file_total_size:%s, create_new_file:%s", filePath, super.getTotalRowCount(), super.getTotalSize(), createNewFile);
        logger.info(msg);
        ArrayList fileList = Lists.newArrayList((Object[])new String[]{filePath});
        try {
            FormatFileWriter oldFileWriter = createNewFile ? this.newFileWriter() : super.getFileWriter();
            oldFileWriter.close();
            this.localFiles.add(fileList);
            super.commit();
        }
        catch (IOException e) {
            logger.error(e.getMessage());
        }
        return fileList;
    }

    private void callBack(boolean async, List<String> fileList) throws InterruptedException {
        String msg;
        if (CollectionUtils.isEmpty(fileList)) {
            return;
        }
        while (!this.workingThread.isEmpty()) {
            msg = String.format("Previous callBack action is not finished, %s is waiting...", Thread.currentThread().getName());
            logger.info(msg);
            TimeUnit.SECONDS.sleep(5L);
        }
        msg = String.format("Prepare to callBack, async:%s, fileList:%s", async, fileList);
        logger.info(msg);
        Runnable runnable = () -> this.commitIfFileReady(fileList);
        Thread thread = new Thread(runnable);
        logger.info("CallBack thread begin, name: {}", (Object)thread.getName());
        this.workingThread.put(thread.getName(), thread);
        thread.start();
        if (!async) {
            logger.info("Wait callBack to finish");
            thread.join();
        }
        logger.info("CallBack done with async={}", (Object)async);
    }

    private void commitIfFileReady(List<String> fileList) {
        if (CollectionUtils.isNotEmpty(fileList)) {
            this.callBack(fileList);
        }
        this.workingThread.remove(Thread.currentThread().getName());
        String msg = String.format("Flush thread done, name: %s", Thread.currentThread().getName());
        logger.info(msg);
    }

    protected void callBack(List<String> fileList) {
    }

    @Override
    protected String getDataPath() {
        return this.localPath;
    }

    public List<List<String>> getBatchFiles() {
        return this.localFiles;
    }

    protected void exit() throws InterruptedException {
        this.workingThreadLock.lock();
        List<String> filePath = this.commitIfFileReady(false);
        this.callBack(true, filePath);
        this.workingThreadLock.unlock();
        if (!this.workingThread.isEmpty()) {
            for (String key : this.workingThread.keySet()) {
                logger.info("Wait flush thread '{}' to finish", (Object)key);
                Thread thread = this.workingThread.get(key);
                if (thread == null) continue;
                thread.join();
            }
        }
        this.rmDir();
    }

    private void rmDir() {
        try {
            Path path = Paths.get(this.localPath, new String[0]);
            if (Files.exists(path, new LinkOption[0]) && this.isDirectoryEmpty(path)) {
                Files.delete(path);
                logger.info("Delete local directory {}", (Object)this.localPath);
            }
        }
        catch (IOException e) {
            logger.error("Error while deleting directory: " + e.getMessage());
        }
    }

    private boolean isDirectoryEmpty(Path path) throws IOException {
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(path);){
            boolean bl = !dirStream.iterator().hasNext();
            return bl;
        }
    }

    protected String getUUID() {
        return this.uuid;
    }

    @Override
    public void close() throws Exception {
        logger.info("execute remaining actions to prevent loss of memory data or residual empty directories.");
        this.exit();
        logger.info(String.format("LocalBulkWriter done! output local files: %s", this.getBatchFiles()));
    }
}

