/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.lambda.common;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBuffer;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig;
import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

public class LambdaCommonHandler {
    private static final Logger LOG = LoggerFactory.getLogger(LambdaCommonHandler.class);

    private LambdaCommonHandler() {
    }

    public static boolean isSuccess(InvokeResponse response) {
        if (response == null) {
            return false;
        }
        int statusCode = response.statusCode();
        return statusCode >= 200 && statusCode < 300;
    }

    public static void waitForFutures(Collection<CompletableFuture<InvokeResponse>> futureList) {
        if (!futureList.isEmpty()) {
            CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
        }
    }

    private static List<Buffer> createBufferBatches(Collection<Record<Event>> records, List<String> keys, BatchOptions batchOptions, OutputCodecContext outputCodecContext) {
        int maxEvents = batchOptions.getThresholdOptions().getEventCount();
        ByteCount maxBytes = batchOptions.getThresholdOptions().getMaximumSize();
        String keyName = batchOptions.getKeyName();
        Duration maxCollectionDuration = batchOptions.getThresholdOptions().getEventCollectTimeOut();
        InMemoryBuffer currentBufferPerBatch = new InMemoryBuffer(keyName, outputCodecContext, keys);
        ArrayList<Buffer> batchedBuffers = new ArrayList<Buffer>();
        LOG.debug("Batch size received to lambda processor: {}", (Object)records.size());
        for (Record<Event> record : records) {
            if (currentBufferPerBatch.getEventCount() > 0 && ThresholdCheck.checkSizeThresholdExceed(currentBufferPerBatch, maxBytes, record)) {
                batchedBuffers.add(currentBufferPerBatch);
                currentBufferPerBatch = new InMemoryBuffer(keyName, outputCodecContext, keys);
            }
            currentBufferPerBatch.addRecord(record);
            if (currentBufferPerBatch.getEventCount() <= 0 || !ThresholdCheck.checkEventCountThresholdExceeded(currentBufferPerBatch, maxEvents)) continue;
            batchedBuffers.add(currentBufferPerBatch);
            currentBufferPerBatch = new InMemoryBuffer(keyName, outputCodecContext, keys);
        }
        if (currentBufferPerBatch.getEventCount() > 0) {
            batchedBuffers.add(currentBufferPerBatch);
        }
        return batchedBuffers;
    }

    public static Map<Buffer, CompletableFuture<InvokeResponse>> sendRecords(Collection<Record<Event>> records, LambdaCommonConfig config, LambdaAsyncClient lambdaAsyncClient, OutputCodecContext outputCodecContext) {
        List<Buffer> batchedBuffers = LambdaCommonHandler.createBufferBatches(records, config.getKeys(), config.getBatchOptions(), outputCodecContext);
        Map<Buffer, CompletableFuture<InvokeResponse>> bufferToFutureMap = LambdaCommonHandler.invokeLambdaAndGetFutureMap(config, lambdaAsyncClient, batchedBuffers);
        return bufferToFutureMap;
    }

    public static Map<Buffer, CompletableFuture<InvokeResponse>> invokeLambdaAndGetFutureMap(LambdaCommonConfig config, LambdaAsyncClient lambdaAsyncClient, List<Buffer> batchedBuffers) {
        HashMap<Buffer, CompletableFuture<InvokeResponse>> bufferToFutureMap = new HashMap<Buffer, CompletableFuture<InvokeResponse>>();
        LOG.debug("Batch Chunks created after threshold check: {}", (Object)batchedBuffers.size());
        for (Buffer buffer : batchedBuffers) {
            InvokeRequest requestPayload = buffer.getRequestPayload(config.getFunctionName(), config.getInvocationType().getAwsLambdaValue());
            if (requestPayload != null) {
                CompletableFuture future = lambdaAsyncClient.invoke(requestPayload);
                bufferToFutureMap.put(buffer, future);
                continue;
            }
            LOG.warn("Request Payload is null, skipping lambda invocation");
        }
        return bufferToFutureMap;
    }
}

