/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.s3.analyticsaccelerator.io.physical.reader;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.s3.analyticsaccelerator.common.Metrics;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;
import software.amazon.s3.analyticsaccelerator.common.telemetry.Operation;
import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry;
import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration;
import software.amazon.s3.analyticsaccelerator.io.physical.data.Block;
import software.amazon.s3.analyticsaccelerator.request.GetRequest;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectContent;
import software.amazon.s3.analyticsaccelerator.request.Range;
import software.amazon.s3.analyticsaccelerator.request.ReadMode;
import software.amazon.s3.analyticsaccelerator.request.Referrer;
import software.amazon.s3.analyticsaccelerator.util.MetricKey;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes;
import software.amazon.s3.analyticsaccelerator.util.retry.DefaultRetryStrategyImpl;
import software.amazon.s3.analyticsaccelerator.util.retry.RetryStrategy;

public class StreamReader
implements Closeable {
    private final ObjectClient objectClient;
    private final ObjectKey objectKey;
    private final ExecutorService threadPool;
    private final Consumer<List<Block>> removeBlocksFunc;
    private final Metrics aggregatingMetrics;
    private final OpenStreamInformation openStreamInformation;
    private final Telemetry telemetry;
    private final PhysicalIOConfiguration physicalIOConfiguration;
    private final RetryStrategy retryStrategy;
    private static final String OPERATION_GET_OBJECT = "s3.stream.get";
    private static final String OPERATION_STREAM_READ = "s3.stream.read";
    private static final Logger LOG = LoggerFactory.getLogger(StreamReader.class);

    public StreamReader(@NonNull ObjectClient objectClient, @NonNull ObjectKey objectKey, @NonNull ExecutorService threadPool, @NonNull Consumer<List<Block>> removeBlocksFunc, @NonNull Metrics aggregatingMetrics, @NonNull OpenStreamInformation openStreamInformation, @NonNull Telemetry telemetry, @NonNull PhysicalIOConfiguration physicalIOConfiguration) {
        if (objectClient == null) {
            throw new NullPointerException("objectClient is marked non-null but is null");
        }
        if (objectKey == null) {
            throw new NullPointerException("objectKey is marked non-null but is null");
        }
        if (threadPool == null) {
            throw new NullPointerException("threadPool is marked non-null but is null");
        }
        if (removeBlocksFunc == null) {
            throw new NullPointerException("removeBlocksFunc is marked non-null but is null");
        }
        if (aggregatingMetrics == null) {
            throw new NullPointerException("aggregatingMetrics is marked non-null but is null");
        }
        if (openStreamInformation == null) {
            throw new NullPointerException("openStreamInformation is marked non-null but is null");
        }
        if (telemetry == null) {
            throw new NullPointerException("telemetry is marked non-null but is null");
        }
        if (physicalIOConfiguration == null) {
            throw new NullPointerException("physicalIOConfiguration is marked non-null but is null");
        }
        this.objectClient = objectClient;
        this.objectKey = objectKey;
        this.threadPool = threadPool;
        this.removeBlocksFunc = removeBlocksFunc;
        this.aggregatingMetrics = aggregatingMetrics;
        this.openStreamInformation = openStreamInformation;
        this.telemetry = telemetry;
        this.physicalIOConfiguration = physicalIOConfiguration;
        this.retryStrategy = this.createRetryStrategy();
    }

    private RetryStrategy createRetryStrategy() {
        RetryStrategy provided = this.openStreamInformation.getRetryStrategy();
        if (provided == null) {
            provided = new DefaultRetryStrategyImpl();
        }
        if (this.physicalIOConfiguration.getBlockReadTimeout() > 0L && !provided.isTimeoutSet()) {
            provided.setTimeoutPolicy(this.physicalIOConfiguration.getBlockReadTimeout(), this.physicalIOConfiguration.getBlockReadRetryCount());
        }
        return provided;
    }

    @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED"}, justification="Intentional fire-and-forget task")
    public void read(@NonNull List<Block> blocks, ReadMode readMode) {
        if (blocks == null) {
            throw new NullPointerException("blocks is marked non-null but is null");
        }
        Preconditions.checkArgument(!blocks.isEmpty(), "`blocks` list must not be empty");
        this.threadPool.submit(this.processReadTask(blocks, readMode));
    }

    private Runnable processReadTask(List<Block> blocks, ReadMode readMode) {
        return () -> this.telemetry.measureCritical(() -> (Operation)((Operation.OperationBuilder)((Operation.OperationBuilder)((Operation.OperationBuilder)((Operation.OperationBuilder)Operation.builder().name(OPERATION_STREAM_READ)).attribute(StreamAttributes.uri(this.objectKey.getS3URI()))).attribute(StreamAttributes.etag(this.objectKey.getEtag()))).attribute(StreamAttributes.effectiveRange(((Block)blocks.get(0)).getBlockKey().getRange().getStart(), ((Block)blocks.get(blocks.size() - 1)).getBlockKey().getRange().getEnd()))).build(), () -> {
            try {
                this.retryStrategy.execute(() -> {
                    try (InputStream inputStream = null;){
                        List<Block> nonFilledBlocks = blocks.stream().filter(block -> !block.isDataReady()).collect(Collectors.toList());
                        Range requestRange = this.computeRange(nonFilledBlocks);
                        GetRequest getRequest = GetRequest.builder().s3Uri(this.objectKey.getS3URI()).range(requestRange).etag(this.objectKey.getEtag()).referrer(new Referrer(requestRange.toHttpString(), readMode)).build();
                        ObjectContent objectContent = this.fetchObjectContent(getRequest);
                        this.openStreamInformation.getRequestCallback().onGetRequest();
                        if (objectContent == null) {
                            this.removeNonFilledBlocksFromStore(nonFilledBlocks);
                            return;
                        }
                        inputStream = objectContent.getStream();
                        boolean success = this.readBlocksFromStream(inputStream, nonFilledBlocks, requestRange.getStart());
                        if (!success) {
                            this.removeNonFilledBlocksFromStore(nonFilledBlocks);
                        }
                    }
                });
            }
            catch (Exception e) {
                LOG.error("Unexpected exception while reading blocks", (Throwable)e);
                if (e instanceof IOException) {
                    this.setErrorOnBlocksAndRemove(blocks, (IOException)e);
                }
                IOException ioException = new IOException("Unexpected error during block reading", e);
                this.setErrorOnBlocksAndRemove(blocks, ioException);
            }
        });
    }

    private boolean readBlocksFromStream(InputStream inputStream, List<Block> blocks, long initialOffset) throws IOException {
        long currentOffset = initialOffset;
        for (Block block : blocks) {
            boolean success = this.readBlock(inputStream, block, currentOffset);
            if (!success) {
                return false;
            }
            currentOffset += (long)block.getLength();
        }
        return true;
    }

    private Range computeRange(List<Block> blocks) {
        long rangeStart = blocks.get(0).getBlockKey().getRange().getStart();
        long rangeEnd = blocks.get(blocks.size() - 1).getBlockKey().getRange().getEnd();
        return new Range(rangeStart, rangeEnd);
    }

    private ObjectContent fetchObjectContent(GetRequest getRequest) throws IOException {
        this.aggregatingMetrics.add(MetricKey.GET_REQUEST_COUNT, 1L);
        return this.objectClient.getObject(getRequest, this.openStreamInformation);
    }

    private boolean readBlock(InputStream inputStream, Block block, long currentPos) throws IOException {
        long blockStart = block.getBlockKey().getRange().getStart();
        int blockSize = block.getLength();
        if (!this.skipToBlockStart(inputStream, blockStart, currentPos)) {
            return false;
        }
        byte[] blockData = this.readExactBytes(inputStream, blockSize);
        block.setData(blockData);
        return true;
    }

    private boolean skipToBlockStart(InputStream inputStream, long blockStart, long currentPos) throws IOException {
        long skipped;
        long skipBytes = blockStart - currentPos;
        if (skipBytes <= 0L) {
            return true;
        }
        for (long totalSkipped = 0L; totalSkipped < skipBytes; totalSkipped += skipped) {
            skipped = inputStream.skip(skipBytes - totalSkipped);
            if (skipped > 0L) continue;
            return false;
        }
        return true;
    }

    private byte[] readExactBytes(InputStream inputStream, int size) throws IOException {
        int bytesRead;
        byte[] buffer = new byte[size];
        for (int totalRead = 0; totalRead < size; totalRead += bytesRead) {
            bytesRead = inputStream.read(buffer, totalRead, size - totalRead);
            if (bytesRead != -1) continue;
            throw new EOFException("Premature EOF: expected " + size + " bytes, but got " + totalRead);
        }
        return buffer;
    }

    private void setErrorOnBlocksAndRemove(List<Block> blocks, IOException error) {
        List<Block> nonReadyBlocks = blocks.stream().filter(block -> !block.isDataReady()).collect(Collectors.toList());
        nonReadyBlocks.forEach(block -> block.setError(error));
        this.removeBlocksFunc.accept(nonReadyBlocks);
    }

    private void removeNonFilledBlocksFromStore(List<Block> blocks) {
        this.removeBlocksFunc.accept(blocks.stream().filter(block -> !block.isDataReady()).collect(Collectors.toList()));
    }

    @Override
    public void close() {
    }
}

