/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsDispatcher;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsSinkUtils;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.SinkStopWatch;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;

public class CloudWatchLogsService {
    private final CloudWatchLogsDispatcher cloudWatchLogsDispatcher;
    private final Buffer buffer;
    private final CloudWatchLogsLimits cloudWatchLogsLimits;
    private final SinkStopWatch sinkStopWatch;
    private final ReentrantLock processLock;
    private final DlqPushHandler dlqPushHandler;

    public CloudWatchLogsService(Buffer buffer, CloudWatchLogsLimits cloudWatchLogsLimits, CloudWatchLogsDispatcher cloudWatchLogsDispatcher, DlqPushHandler dlqPushHandler) {
        this.buffer = buffer;
        this.cloudWatchLogsLimits = cloudWatchLogsLimits;
        this.processLock = new ReentrantLock();
        this.sinkStopWatch = new SinkStopWatch();
        this.cloudWatchLogsDispatcher = cloudWatchLogsDispatcher;
        this.dlqPushHandler = dlqPushHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processLogEvents(Collection<Record<Event>> logs) {
        this.sinkStopWatch.startIfNotRunning();
        if (logs.isEmpty() && this.buffer.getEventCount() > 0) {
            this.processLock.lock();
            try {
                if (this.cloudWatchLogsLimits.isTimeLimitReached(this.sinkStopWatch.getElapsedTimeInSeconds())) {
                    this.stageLogEvents();
                }
            }
            finally {
                this.processLock.unlock();
            }
            return;
        }
        ArrayList<DlqObject> dlqObjects = new ArrayList<DlqObject>();
        for (Record<Event> log : logs) {
            String logString = ((Event)log.getData()).toJsonString();
            int logLength = logString.length();
            if (this.cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) {
                String failureMessage = String.format("Event blocked due to Max Size restriction! Event Size : %s", logLength + 26);
                DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, ((Event)log.getData()).getEventHandle(), logString, failureMessage, this.dlqPushHandler);
                if (dlqObject == null) continue;
                dlqObjects.add(dlqObject);
                continue;
            }
            long time = this.sinkStopWatch.getElapsedTimeInSeconds();
            this.processLock.lock();
            try {
                int bufferSize = this.buffer.getBufferSize();
                int bufferEventCount = this.buffer.getEventCount();
                if (this.cloudWatchLogsLimits.maxRequestSizeLimitExceeds(logLength + bufferSize, bufferEventCount + 1)) {
                    this.stageLogEvents();
                }
                this.addToBuffer(((Event)log.getData()).getEventHandle(), logString);
                bufferEventCount = this.buffer.getEventCount();
                if (!this.cloudWatchLogsLimits.isMaxEventCountLimitReached(bufferEventCount)) continue;
                this.stageLogEvents();
            }
            finally {
                this.processLock.unlock();
            }
        }
        CloudWatchLogsSinkUtils.handleDlqObjects(dlqObjects, this.dlqPushHandler);
    }

    private void stageLogEvents() {
        this.sinkStopWatch.stopAndReset();
        List<InputLogEvent> inputLogEvents = this.cloudWatchLogsDispatcher.prepareInputLogEvents(this.buffer.getBufferedData());
        this.cloudWatchLogsDispatcher.dispatchLogs(inputLogEvents, this.buffer.getEventHandles());
        this.buffer.resetBuffer();
    }

    private void addToBuffer(EventHandle logEventHandle, String logString) {
        this.buffer.writeEvent(logEventHandle, logString.getBytes(StandardCharsets.UTF_8));
    }
}

