/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import java.time.Duration;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileCheckpointer;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoader;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.S3ObjectReader;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import software.amazon.awssdk.services.s3.S3Client;

public class DataFileLoaderFactory {
    static final Duration ACKNOWLEDGMENT_EXPIRY_INCREASE_TIME = Duration.ofMinutes(10L);
    private static final Duration ACKNOWLEDGMENT_PROGRESS_CHECK_INTERVAL = Duration.ofMinutes(2L);
    private final EnhancedSourceCoordinator coordinator;
    private final S3ObjectReader objectReader;
    private final PluginMetrics pluginMetrics;
    private final Buffer<Record<Event>> buffer;

    public DataFileLoaderFactory(EnhancedSourceCoordinator coordinator, S3Client s3Client, PluginMetrics pluginMetrics, Buffer<Record<Event>> buffer) {
        this.coordinator = coordinator;
        this.pluginMetrics = pluginMetrics;
        this.buffer = buffer;
        this.objectReader = new S3ObjectReader(s3Client);
    }

    public Runnable createDataFileLoader(DataFilePartition dataFilePartition, TableInfo tableInfo, AcknowledgementSet acknowledgementSet, Duration acknowledgmentTimeout) {
        DataFileCheckpointer checkpointer = new DataFileCheckpointer(this.coordinator, dataFilePartition);
        if (acknowledgementSet != null) {
            this.addProgressCheck(acknowledgementSet);
        }
        return DataFileLoader.builder(this.objectReader, this.pluginMetrics, this.buffer).bucketName(dataFilePartition.getBucket()).key(dataFilePartition.getKey()).tableInfo(tableInfo).exportStartTime(dataFilePartition.getProgressState().get().getStartTime()).checkpointer(checkpointer).acknowledgmentSet(acknowledgementSet).acknowledgmentSetTimeout(acknowledgmentTimeout).startLine(acknowledgementSet == null ? dataFilePartition.getProgressState().get().getLoaded() : 0).build();
    }

    private void addProgressCheck(AcknowledgementSet acknowledgementSet) {
        acknowledgementSet.addProgressCheck(ignored -> acknowledgementSet.increaseExpiry(ACKNOWLEDGMENT_EXPIRY_INCREASE_TIME), ACKNOWLEDGMENT_PROGRESS_CHECK_INTERVAL);
    }
}

