/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.zip.GZIPInputStream;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileCheckpointer;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.S3ObjectReader;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataFileLoader
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class);
    private static volatile boolean shouldStop = false;
    private static final int DEFAULT_BATCH_SIZE = 1000;
    private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 120000;
    static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60L);
    static final int DEFAULT_BUFFER_BATCH_SIZE = 1000;
    private final String bucketName;
    private final String key;
    private final ExportRecordConverter recordConverter;
    private final S3ObjectReader objectReader;
    private final DataFileCheckpointer checkpointer;
    private final int startLine;
    private final AcknowledgementSet acknowledgementSet;
    private final Duration dataFileAcknowledgmentTimeout;

    private DataFileLoader(Builder builder) {
        this.objectReader = builder.objectReader;
        this.bucketName = builder.bucketName;
        this.key = builder.key;
        this.checkpointer = builder.checkpointer;
        this.startLine = builder.startLine;
        BufferAccumulator bufferAccumulator = BufferAccumulator.create(builder.buffer, (int)1000, (Duration)BUFFER_TIMEOUT);
        this.recordConverter = new ExportRecordConverter((BufferAccumulator<Record<Event>>)bufferAccumulator, builder.tableInfo, builder.pluginMetrics, builder.exportStartTime);
        this.acknowledgementSet = builder.acknowledgementSet;
        this.dataFileAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout;
    }

    public static Builder builder(S3ObjectReader s3ObjectReader, PluginMetrics pluginMetrics, Buffer<Record<Event>> buffer) {
        return new Builder(s3ObjectReader, pluginMetrics, buffer);
    }

    @Override
    public void run() {
        LOG.info("Start loading s3://{}/{} with start line {}", new Object[]{this.bucketName, this.key, this.startLine});
        long lastCheckpointTime = System.currentTimeMillis();
        ArrayList<String> lines = new ArrayList<String>();
        int lineCount = 0;
        int lastLineProcessed = 0;
        try (InputStream inputStream = this.objectReader.readFile(this.bucketName, this.key);
             GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
             BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream));){
            String line;
            while ((line = reader.readLine()) != null) {
                if (shouldStop) {
                    this.checkpointer.checkpoint(lastLineProcessed);
                    LOG.warn("Loading data file s3://{}/{} was interrupted by a shutdown signal, giving up ownership of data file", (Object)this.bucketName, (Object)this.key);
                    throw new RuntimeException("Loading data file interrupted");
                }
                if (++lineCount <= this.startLine) continue;
                lines.add(line);
                if ((lineCount - this.startLine) % 1000 == 0) {
                    this.recordConverter.writeToBuffer(this.acknowledgementSet, lines);
                    lines.clear();
                    lastLineProcessed = lineCount;
                }
                if (System.currentTimeMillis() - lastCheckpointTime <= 120000L) continue;
                LOG.debug("Perform regular checkpointing for Data File Loader");
                if (this.acknowledgementSet != null) {
                    this.checkpointer.updateDatafileForAcknowledgmentWait(this.dataFileAcknowledgmentTimeout);
                } else {
                    this.checkpointer.checkpoint(lastLineProcessed);
                }
                lastCheckpointTime = System.currentTimeMillis();
            }
            if (!lines.isEmpty()) {
                this.recordConverter.writeToBuffer(this.acknowledgementSet, lines);
                this.checkpointer.checkpoint(lineCount);
            }
            LOG.info("Completed loading {} lines from s3://{}/{} to buffer", new Object[]{lines.size(), this.bucketName, this.key});
            lines.clear();
            if (this.acknowledgementSet != null) {
                this.checkpointer.updateDatafileForAcknowledgmentWait(this.dataFileAcknowledgmentTimeout);
                this.acknowledgementSet.complete();
            }
        }
        catch (Exception e) {
            if (this.acknowledgementSet != null) {
                this.acknowledgementSet.cancel();
            }
            this.checkpointer.checkpoint(lineCount);
            String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %s", this.bucketName, this.key, e.getMessage());
            throw new RuntimeException(errorMessage);
        }
    }

    public static void stopAll() {
        shouldStop = true;
    }

    static class Builder {
        private final S3ObjectReader objectReader;
        private final PluginMetrics pluginMetrics;
        private final Buffer<Record<Event>> buffer;
        private TableInfo tableInfo;
        private DataFileCheckpointer checkpointer;
        private String bucketName;
        private String key;
        private AcknowledgementSet acknowledgementSet;
        private Duration dataFileAcknowledgmentTimeout;
        private int startLine;
        private long exportStartTime;

        public Builder(S3ObjectReader objectReader, PluginMetrics pluginMetrics, Buffer<Record<Event>> buffer) {
            this.objectReader = objectReader;
            this.pluginMetrics = pluginMetrics;
            this.buffer = buffer;
        }

        public Builder tableInfo(TableInfo tableInfo) {
            this.tableInfo = tableInfo;
            return this;
        }

        public Builder checkpointer(DataFileCheckpointer checkpointer) {
            this.checkpointer = checkpointer;
            return this;
        }

        public Builder bucketName(String bucketName) {
            this.bucketName = bucketName;
            return this;
        }

        public Builder key(String key) {
            this.key = key;
            return this;
        }

        public Builder startLine(int startLine) {
            this.startLine = startLine;
            return this;
        }

        public Builder exportStartTime(long exportStartTime) {
            this.exportStartTime = exportStartTime;
            return this;
        }

        public Builder acknowledgmentSet(AcknowledgementSet acknowledgementSet) {
            this.acknowledgementSet = acknowledgementSet;
            return this;
        }

        public Builder acknowledgmentSetTimeout(Duration dataFileAcknowledgmentTimeout) {
            this.dataFileAcknowledgmentTimeout = dataFileAcknowledgmentTimeout;
            return this;
        }

        public DataFileLoader build() {
            return new DataFileLoader(this);
        }
    }
}

