/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.CancelledException;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.exception.status.UnavailableException;
import alluxio.resource.LockResource;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class GrpcBlockingStream<ReqT, ResT> {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcBlockingStream.class);
    private final StreamObserver<ResT> mResponseObserver;
    private final ClientCallStreamObserver<ReqT> mRequestObserver;
    private final BlockingQueue<Object> mResponses;
    private final String mDescription;
    private boolean mCompleted = false;
    private boolean mClosed = false;
    private boolean mCanceled = false;
    private final ReentrantLock mLock = new ReentrantLock();
    @GuardedBy(value="mLock")
    private Throwable mError;
    private final Condition mReadyOrFailed = this.mLock.newCondition();
    private boolean mClosedFromRemote = false;

    public GrpcBlockingStream(Function<StreamObserver<ResT>, StreamObserver<ReqT>> rpcFunc, int bufferSize, String description) {
        LOG.debug("Opening stream ({})", (Object)description);
        this.mResponses = new ArrayBlockingQueue<Object>(bufferSize);
        this.mResponseObserver = new ResponseStreamObserver();
        this.mRequestObserver = (ClientCallStreamObserver)rpcFunc.apply(this.mResponseObserver);
        this.mDescription = description;
    }

    /*
     * Exception decompiling
     */
    public void send(ReqT request, long timeoutMs) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [11[DOLOOP]], but top level block is 1[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public void send(ReqT request) throws IOException {
        if (this.mClosed || this.mCanceled || this.mClosedFromRemote) {
            LOG.debug("Failed to send request {}: stream is already closed or canceled. ({})", request, (Object)this.mDescription);
            return;
        }
        try (LockResource lr = new LockResource((Lock)this.mLock);){
            this.checkError();
        }
        this.mRequestObserver.onNext(request);
    }

    public ResT receive(long timeoutMs) throws IOException {
        if (this.mCompleted) {
            return null;
        }
        if (this.mCanceled) {
            throw new CancelledException(this.formatErrorMessage("Stream is already canceled.", new Object[0]));
        }
        try {
            Object response = this.mResponses.poll(timeoutMs, TimeUnit.MILLISECONDS);
            if (response == null) {
                throw new DeadlineExceededException(this.formatErrorMessage("Timeout waiting for response after %dms.", timeoutMs));
            }
            if (response == this.mResponseObserver) {
                this.mCompleted = true;
                return null;
            }
            this.checkError();
            return (ResT)response;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new CancelledException(this.formatErrorMessage("Interrupted while waiting for response.", new Object[0]), (Throwable)e);
        }
    }

    public void close() {
        if (this.isOpen()) {
            LOG.debug("Closing stream ({})", (Object)this.mDescription);
            this.mClosed = true;
            this.mRequestObserver.onCompleted();
        }
    }

    public void cancel() {
        if (this.isOpen()) {
            LOG.debug("Cancelling stream ({})", (Object)this.mDescription);
            this.mCanceled = true;
            this.mRequestObserver.cancel("Request is cancelled by user.", null);
        }
    }

    public void waitForComplete(long timeoutMs) throws IOException {
        if (this.mCompleted || this.mCanceled) {
            return;
        }
        while (this.receive(timeoutMs) != null) {
        }
    }

    public boolean isClosedFromRemote() {
        return this.mClosedFromRemote;
    }

    public boolean isOpen() {
        try (LockResource lr = new LockResource((Lock)this.mLock);){
            boolean bl = !this.mClosed && !this.mCanceled && this.mError == null;
            return bl;
        }
    }

    public boolean isClosed() {
        return this.mClosed;
    }

    public boolean isCanceled() {
        return this.mCanceled;
    }

    private void checkError() throws IOException {
        try (LockResource lr = new LockResource((Lock)this.mLock);){
            if (this.mError != null) {
                this.mCanceled = true;
                throw this.toAlluxioStatusException(this.mError);
            }
        }
    }

    private AlluxioStatusException toAlluxioStatusException(Throwable t) {
        Throwable ex;
        if (t instanceof StatusRuntimeException) {
            ex = AlluxioStatusException.fromStatusRuntimeException((StatusRuntimeException)((StatusRuntimeException)t));
            if (ex.getStatusCode() == Status.Code.CANCELLED) {
                ex = new UnavailableException(this.formatErrorMessage("Stream is canceled by server.", new Object[0]), ex);
            }
        } else {
            ex = AlluxioStatusException.fromThrowable((Throwable)this.mError);
        }
        return AlluxioStatusException.from((Status)ex.getStatus().withDescription(this.formatErrorMessage(ex.getMessage(), new Object[0])));
    }

    private String formatErrorMessage(String format, Object ... args) {
        StringBuilder errorMessage = new StringBuilder(format == null ? "Unknown error" : String.format(format, args));
        return new StringBuilder(errorMessage).append(String.format(" (%s)", this.mDescription)).toString();
    }

    private final class ResponseStreamObserver
    implements ClientResponseObserver<ReqT, ResT> {
        private ResponseStreamObserver() {
        }

        public void onNext(ResT response) {
            try {
                GrpcBlockingStream.this.mResponses.put(response);
            }
            catch (InterruptedException e) {
                this.handleInterruptedException(e);
            }
        }

        public void onError(Throwable t) {
            try (LockResource lr = new LockResource((Lock)GrpcBlockingStream.this.mLock);){
                LOG.warn("Received error {} for stream ({})", (Object)t, (Object)GrpcBlockingStream.this.mDescription);
                this.updateException(t);
                GrpcBlockingStream.this.mReadyOrFailed.signal();
            }
        }

        public void onCompleted() {
            try {
                LOG.debug("Received completed event for stream ({})", (Object)GrpcBlockingStream.this.mDescription);
                GrpcBlockingStream.this.mResponses.put(this);
                GrpcBlockingStream.this.mClosedFromRemote = true;
            }
            catch (InterruptedException e) {
                this.handleInterruptedException(e);
            }
        }

        public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
            requestStream.setOnReadyHandler(() -> {
                try (LockResource lr = new LockResource((Lock)GrpcBlockingStream.this.mLock);){
                    GrpcBlockingStream.this.mReadyOrFailed.signal();
                }
            });
        }

        private void handleInterruptedException(InterruptedException e) {
            Thread.currentThread().interrupt();
            try (LockResource lr = new LockResource((Lock)GrpcBlockingStream.this.mLock);){
                this.updateException(e);
            }
            throw new RuntimeException(e);
        }

        @GuardedBy(value="mLock")
        private void updateException(Throwable e) {
            if (GrpcBlockingStream.this.mError == null || GrpcBlockingStream.this.mError == e) {
                GrpcBlockingStream.this.mError = e;
                GrpcBlockingStream.this.mResponses.offer(e);
            } else {
                GrpcBlockingStream.this.mError.addSuppressed(e);
            }
        }
    }
}

