/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.file;

import alluxio.AlluxioURI;
import alluxio.client.ReadType;
import alluxio.client.block.AlluxioBlockStore;
import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.client.file.options.InStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PreconditionMessage;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.AsyncCacheRequest;
import alluxio.resource.CloseableResource;
import alluxio.retry.CountingRetry;
import alluxio.util.CommonUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class AlluxioFileInStream
extends FileInStream {
    private static final Logger LOG = LoggerFactory.getLogger(AlluxioFileInStream.class);
    private final int mBlockWorkerClientReadRetry;
    private final URIStatus mStatus;
    private final InStreamOptions mOptions;
    private final AlluxioBlockStore mBlockStore;
    private final FileSystemContext mContext;
    private final boolean mPassiveCachingEnabled;
    private final long mLength;
    private final long mBlockSize;
    private long mPosition;
    private BlockInStream mBlockInStream;
    private BlockInStream mCachedPositionedReadStream;
    private long mLastBlockIdCached;
    private Map<WorkerNetAddress, Long> mFailedWorkers = new HashMap<WorkerNetAddress, Long>();
    private Closer mCloser = Closer.create();

    protected AlluxioFileInStream(URIStatus status, InStreamOptions options, FileSystemContext context) {
        this.mContext = context;
        this.mCloser.register((Closeable)this.mContext.blockReinit());
        try {
            AlluxioConfiguration conf = this.mContext.getPathConf(new AlluxioURI(status.getPath()));
            this.mPassiveCachingEnabled = conf.getBoolean(PropertyKey.USER_FILE_PASSIVE_CACHE_ENABLED);
            this.mBlockWorkerClientReadRetry = conf.getInt(PropertyKey.USER_BLOCK_WORKER_CLIENT_READ_RETRY);
            this.mStatus = status;
            this.mOptions = options;
            this.mBlockStore = AlluxioBlockStore.create(this.mContext);
            this.mLength = this.mStatus.getLength();
            this.mBlockSize = this.mStatus.getBlockSizeBytes();
            this.mPosition = 0L;
            this.mBlockInStream = null;
            this.mCachedPositionedReadStream = null;
            this.mLastBlockIdCached = 0L;
        }
        catch (Throwable t) {
            throw CommonUtils.closeAndRethrowRuntimeException((Closer)this.mCloser, (Throwable)t);
        }
    }

    @Override
    public int read() throws IOException {
        if (this.mPosition == this.mLength) {
            return -1;
        }
        CountingRetry retry = new CountingRetry(this.mBlockWorkerClientReadRetry);
        Throwable lastException = null;
        while (retry.attempt()) {
            try {
                this.updateStream();
                int result = this.mBlockInStream.read();
                if (result != -1) {
                    ++this.mPosition;
                }
                return result;
            }
            catch (DeadlineExceededException | UnavailableException | ConnectException e) {
                lastException = e;
                if (this.mBlockInStream == null) continue;
                this.handleRetryableException(this.mBlockInStream, (IOException)e);
                this.mBlockInStream = null;
            }
        }
        throw lastException;
    }

    @Override
    public int read(byte[] b) throws IOException {
        return this.read(b, 0, b.length);
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        Preconditions.checkArgument((b != null ? 1 : 0) != 0, (Object)PreconditionMessage.ERR_READ_BUFFER_NULL);
        Preconditions.checkArgument((off >= 0 && len >= 0 && len + off <= b.length ? 1 : 0) != 0, (String)PreconditionMessage.ERR_BUFFER_STATE.toString(), (Object)b.length, (Object)off, (Object)len);
        if (len == 0) {
            return 0;
        }
        if (this.mPosition == this.mLength) {
            return -1;
        }
        int bytesLeft = len;
        int currentOffset = off;
        CountingRetry retry = new CountingRetry(this.mBlockWorkerClientReadRetry);
        Throwable lastException = null;
        while (bytesLeft > 0 && this.mPosition != this.mLength && retry.attempt()) {
            try {
                this.updateStream();
                int bytesRead = this.mBlockInStream.read(b, currentOffset, bytesLeft);
                if (bytesRead > 0) {
                    bytesLeft -= bytesRead;
                    currentOffset += bytesRead;
                    this.mPosition += (long)bytesRead;
                }
                retry.reset();
                lastException = null;
            }
            catch (DeadlineExceededException | UnavailableException | ConnectException e) {
                lastException = e;
                if (this.mBlockInStream == null) continue;
                this.handleRetryableException(this.mBlockInStream, (IOException)e);
                this.mBlockInStream = null;
            }
        }
        if (lastException != null) {
            throw lastException;
        }
        return len - bytesLeft;
    }

    @Override
    public long skip(long n) throws IOException {
        if (n <= 0L) {
            return 0L;
        }
        long toSkip = Math.min(n, this.mLength - this.mPosition);
        this.seek(this.mPosition + toSkip);
        return toSkip;
    }

    @Override
    public void close() throws IOException {
        this.closeBlockInStream(this.mBlockInStream);
        this.closeBlockInStream(this.mCachedPositionedReadStream);
        this.mCloser.close();
    }

    @Override
    public long remaining() {
        return this.mLength - this.mPosition;
    }

    @Override
    public int positionedRead(long pos, byte[] b, int off, int len) throws IOException {
        return this.positionedReadInternal(pos, b, off, len);
    }

    private int positionedReadInternal(long pos, byte[] b, int off, int len) throws IOException {
        if (pos < 0L || pos >= this.mLength) {
            return -1;
        }
        if ((long)len < this.mContext.getPathConf(new AlluxioURI(this.mStatus.getPath())).getBytes(PropertyKey.USER_FILE_SEQUENTIAL_PREAD_THRESHOLD)) {
            this.mOptions.setPositionShort(true);
        }
        int lenCopy = len;
        CountingRetry retry = new CountingRetry(this.mBlockWorkerClientReadRetry);
        Throwable lastException = null;
        while (len > 0 && retry.attempt() && pos < this.mLength) {
            long blockId = (Long)this.mStatus.getBlockIds().get(Math.toIntExact(pos / this.mBlockSize));
            try {
                if (this.mCachedPositionedReadStream == null) {
                    this.mCachedPositionedReadStream = this.mBlockStore.getInStream(blockId, this.mOptions, this.mFailedWorkers);
                } else if (this.mCachedPositionedReadStream.getId() != blockId) {
                    this.closeBlockInStream(this.mCachedPositionedReadStream);
                    this.mCachedPositionedReadStream = this.mBlockStore.getInStream(blockId, this.mOptions, this.mFailedWorkers);
                }
                long offset = pos % this.mBlockSize;
                int bytesRead = this.mCachedPositionedReadStream.positionedRead(offset, b, off, (int)Math.min(this.mBlockSize - offset, (long)len));
                Preconditions.checkState((bytesRead > 0 ? 1 : 0) != 0, (Object)"No data is read before EOF");
                pos += (long)bytesRead;
                off += bytesRead;
                len -= bytesRead;
                retry.reset();
                lastException = null;
                this.triggerAsyncCaching(this.mCachedPositionedReadStream);
            }
            catch (DeadlineExceededException | UnavailableException | ConnectException e) {
                lastException = e;
                if (this.mCachedPositionedReadStream == null) continue;
                this.handleRetryableException(this.mCachedPositionedReadStream, (IOException)e);
                this.mCachedPositionedReadStream = null;
            }
        }
        if (lastException != null) {
            throw lastException;
        }
        return lenCopy - len;
    }

    public long getPos() {
        return this.mPosition;
    }

    public void seek(long pos) throws IOException {
        if (this.mPosition == pos) {
            return;
        }
        Preconditions.checkArgument((pos >= 0L ? 1 : 0) != 0, (String)PreconditionMessage.ERR_SEEK_NEGATIVE.toString(), (long)pos);
        Preconditions.checkArgument((pos <= this.mLength ? 1 : 0) != 0, (String)PreconditionMessage.ERR_SEEK_PAST_END_OF_FILE.toString(), (long)pos);
        if (this.mBlockInStream == null) {
            this.mPosition = pos;
            return;
        }
        long delta = pos - this.mPosition;
        if (delta <= this.mBlockInStream.remaining() && delta >= -this.mBlockInStream.getPos()) {
            this.mBlockInStream.seek(this.mBlockInStream.getPos() + delta);
        } else {
            this.closeBlockInStream(this.mBlockInStream);
        }
        this.mPosition += delta;
    }

    private void updateStream() throws IOException {
        long blockId;
        BlockInfo blockInfo;
        if (this.mBlockInStream != null && this.mBlockInStream.remaining() > 0L) {
            return;
        }
        if (this.mBlockInStream != null && this.mBlockInStream.remaining() == 0L) {
            this.closeBlockInStream(this.mBlockInStream);
        }
        if ((blockInfo = this.mStatus.getBlockInfo(blockId = ((Long)this.mStatus.getBlockIds().get(Math.toIntExact(this.mPosition / this.mBlockSize))).longValue())) == null) {
            throw new IOException("No BlockInfo for block(id=" + blockId + ") of file(id=" + this.mStatus.getFileId() + ", path=" + this.mStatus.getPath() + ")");
        }
        boolean isBlockInfoOutdated = true;
        for (BlockLocation location : blockInfo.getLocations()) {
            if (this.mFailedWorkers.containsKey(location.getWorkerAddress())) continue;
            isBlockInfoOutdated = false;
            break;
        }
        this.mBlockInStream = isBlockInfoOutdated ? this.mBlockStore.getInStream(blockId, this.mOptions, this.mFailedWorkers) : this.mBlockStore.getInStream(blockInfo, this.mOptions, this.mFailedWorkers);
        long offset = this.mPosition % this.mBlockSize;
        this.mBlockInStream.seek(offset);
    }

    private void closeBlockInStream(BlockInStream stream) throws IOException {
        if (stream != null) {
            BlockInStream.BlockInStreamSource blockSource = stream.getSource();
            stream.close();
            if (stream == this.mBlockInStream) {
                this.mBlockInStream = null;
            }
            if (blockSource == BlockInStream.BlockInStreamSource.LOCAL) {
                return;
            }
            this.triggerAsyncCaching(stream);
        }
    }

    private void triggerAsyncCaching(BlockInStream stream) throws IOException {
        boolean cache = ReadType.fromProto(this.mOptions.getOptions().getReadType()).isCache();
        boolean overReplicated = this.mStatus.getReplicationMax() > 0 && ((FileBlockInfo)this.mStatus.getFileBlockInfos().get((int)(this.getPos() / this.mBlockSize))).getBlockInfo().getLocations().size() >= this.mStatus.getReplicationMax();
        cache = cache && !overReplicated;
        WorkerNetAddress dataSource = stream.getAddress();
        long blockId = stream.getId();
        if (cache && this.mLastBlockIdCached != blockId) {
            WorkerNetAddress worker = this.mPassiveCachingEnabled && this.mContext.hasLocalWorker() ? this.mContext.getLocalWorker() : dataSource;
            try {
                long blockLength = this.mOptions.getBlockInfo(blockId).getLength();
                AsyncCacheRequest request = AsyncCacheRequest.newBuilder().setBlockId(blockId).setLength(blockLength).setOpenUfsBlockOptions(this.mOptions.getOpenUfsBlockOptions(blockId)).setSourceHost(dataSource.getHost()).setSourcePort(dataSource.getDataPort()).build();
                try (CloseableResource<BlockWorkerClient> blockWorker = this.mContext.acquireBlockWorkerClient(worker);){
                    ((BlockWorkerClient)blockWorker.get()).asyncCache(request);
                    this.mLastBlockIdCached = blockId;
                }
            }
            catch (Exception e) {
                LOG.warn("Failed to complete async cache request for block {} at worker {}: {}", new Object[]{blockId, worker, e.getMessage()});
            }
        }
    }

    private void handleRetryableException(BlockInStream stream, IOException e) {
        WorkerNetAddress workerAddress = stream.getAddress();
        LOG.warn("Failed to read block {} from worker {}, will retry: {}", new Object[]{stream.getId(), workerAddress, e.getMessage()});
        try {
            stream.close();
        }
        catch (Exception ex) {
            LOG.warn("Failed to close input stream for block {}: {}", (Object)stream.getId(), (Object)ex.getMessage());
        }
        this.mFailedWorkers.put(workerAddress, System.currentTimeMillis());
    }
}

