/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.storage.s3.output;

import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.storage.s3.output.S3ExportConfig;
import org.apache.druid.storage.s3.output.S3OutputConfig;
import org.apache.druid.utils.RuntimeInfo;

@ManageLifecycle
public class S3UploadManager {
    private static final String METRIC_PREFIX = "s3/upload/part/";
    private static final String METRIC_PART_QUEUED_TIME = "s3/upload/part/queuedTime";
    private static final String METRIC_QUEUE_SIZE = "s3/upload/part/queueSize";
    private static final String METRIC_PART_UPLOAD_TIME = "s3/upload/part/time";
    private final ExecutorService uploadExecutor;
    private final ServiceEmitter emitter;
    private static final Logger log = new Logger(S3UploadManager.class);
    private final AtomicInteger executorQueueSize = new AtomicInteger(0);

    @Inject
    public S3UploadManager(S3OutputConfig s3OutputConfig, S3ExportConfig s3ExportConfig, RuntimeInfo runtimeInfo, ServiceEmitter emitter) {
        int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors());
        int maxNumChunksOnDisk = S3UploadManager.computeMaxNumChunksOnDisk(s3OutputConfig, s3ExportConfig);
        this.uploadExecutor = this.createExecutorService(poolSize, maxNumChunksOnDisk);
        log.info("Initialized executor service for S3 multipart upload with pool size [%d] and work queue capacity [%d]", new Object[]{poolSize, maxNumChunksOnDisk});
        this.emitter = emitter;
    }

    public static int computeMaxNumChunksOnDisk(S3OutputConfig s3OutputConfig, S3ExportConfig s3ExportConfig) {
        long maxChunkSize = 0x500000L;
        if (s3OutputConfig != null && s3OutputConfig.getChunkSize() != null) {
            maxChunkSize = Math.max(maxChunkSize, s3OutputConfig.getChunkSize());
        }
        if (s3ExportConfig != null && s3ExportConfig.getChunkSize() != null) {
            maxChunkSize = Math.max(maxChunkSize, s3ExportConfig.getChunkSize().getBytes());
        }
        return (int)(0x140000000L / maxChunkSize);
    }

    public Future<UploadPartResult> queueChunkForUpload(ServerSideEncryptingAmazonS3 s3Client, String key, int chunkNumber, File chunkFile, String uploadId, S3OutputConfig config) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.executorQueueSize.incrementAndGet();
        return this.uploadExecutor.submit(() -> {
            ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
            this.emitMetric(metricBuilder.setMetric(METRIC_QUEUE_SIZE, (Number)this.executorQueueSize.decrementAndGet()));
            metricBuilder.setDimension("uploadId", (Object)uploadId).setDimension("partNumber", (Object)chunkNumber);
            this.emitMetric(metricBuilder.setMetric(METRIC_PART_QUEUED_TIME, (Number)stopwatch.millisElapsed()));
            stopwatch.restart();
            return (UploadPartResult)RetryUtils.retry(() -> {
                log.debug("Uploading chunk[%d] for uploadId[%s].", new Object[]{chunkNumber, uploadId});
                UploadPartResult uploadPartResult = this.uploadPartIfPossible(s3Client, uploadId, config.getBucket(), key, chunkNumber, chunkFile);
                if (!chunkFile.delete()) {
                    log.warn("Failed to delete chunk [%s]", new Object[]{chunkFile.getAbsolutePath()});
                }
                this.emitMetric(metricBuilder.setMetric(METRIC_PART_UPLOAD_TIME, (Number)stopwatch.millisElapsed()));
                return uploadPartResult;
            }, S3Utils.S3RETRY, (int)config.getMaxRetry());
        });
    }

    @VisibleForTesting
    UploadPartResult uploadPartIfPossible(ServerSideEncryptingAmazonS3 s3Client, String uploadId, String bucket, String key, int chunkNumber, File chunkFile) {
        UploadPartRequest uploadPartRequest = new UploadPartRequest().withUploadId(uploadId).withBucketName(bucket).withKey(key).withFile(chunkFile).withPartNumber(chunkNumber).withPartSize(chunkFile.length());
        if (log.isDebugEnabled()) {
            log.debug("Pushing chunk[%s] to bucket[%s] and key[%s].", new Object[]{chunkNumber, bucket, key});
        }
        return s3Client.uploadPart(uploadPartRequest);
    }

    private ExecutorService createExecutorService(int poolSize, int maxNumConcurrentChunks) {
        return Execs.newBlockingThreaded((String)"S3UploadThreadPool-%d", (int)poolSize, (int)maxNumConcurrentChunks);
    }

    @LifecycleStart
    public void start() {
    }

    @LifecycleStop
    public void stop() {
        this.uploadExecutor.shutdown();
    }

    protected void emitMetric(ServiceMetricEvent.Builder builder) {
        this.emitter.emit((ServiceEventBuilder)builder);
    }
}

