/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.imap.storage.file.wal.writer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.curator.shaded.com.google.common.io.ByteStreams;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils;
import org.apache.seatunnel.engine.imap.storage.file.wal.writer.IFileWriter;
import org.apache.seatunnel.engine.serializer.api.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CloudWriter
implements IFileWriter<IMapFileData> {
    private static final Logger log = LoggerFactory.getLogger(CloudWriter.class);
    private FileSystem fs;
    private Path parentPath;
    private Path path;
    private Serializer serializer;
    private ByteBuf bf = Unpooled.buffer((int)1024);
    private long blockSize = 0x100000L;
    private AtomicLong index = new AtomicLong(0L);

    @Override
    public void initialize(FileSystem fs, Path parentPath, Serializer serializer) throws IOException {
        this.fs = fs;
        this.serializer = serializer;
        this.parentPath = parentPath;
        this.path = this.createNewPath();
        if (fs.exists(this.path)) {
            try (FSDataInputStream fsDataInputStream = fs.open(this.path);){
                this.bf.writeBytes(ByteStreams.toByteArray((InputStream)fsDataInputStream));
            }
        }
    }

    @Override
    public void setBlockSize(Long blockSize) {
        if (blockSize != null && blockSize > DEFAULT_BLOCK_SIZE) {
            this.blockSize = blockSize;
        }
    }

    @Override
    public void write(IMapFileData data) throws IOException {
        byte[] bytes = this.serializer.serialize((Object)data);
        this.write(bytes);
    }

    @Override
    private void write(byte[] bytes) {
        try (FSDataOutputStream out = this.fs.create(this.path, true);){
            byte[] data = WALDataUtils.wrapperBytes(bytes);
            this.bf.writeBytes(data);
            byte[] allBytes = new byte[this.bf.readableBytes()];
            this.bf.readBytes(allBytes);
            out.write(allBytes);
            this.checkAndSetNextScheduleRotation(allBytes.length);
        }
        catch (Exception ex) {
            throw new IMapStorageException((Throwable)ex);
        }
    }

    private void checkAndSetNextScheduleRotation(long allBytes) {
        if (allBytes > this.blockSize) {
            this.path = this.createNewPath();
            this.bf.clear();
        } else {
            this.bf.resetReaderIndex();
        }
    }

    public Path createNewPath() {
        return new Path(this.parentPath, this.index.incrementAndGet() + "_" + "wal.txt");
    }

    @Override
    public void close() throws Exception {
        this.bf.clear();
        this.bf = null;
    }
}

