/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc;

import com.oracle.coherence.grpc.ErrorsHelper;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.net.SocketException;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

public class SafeStreamObserver<T>
implements StreamObserver<T> {
    private final StreamObserver<? super T> delegate;
    private volatile boolean done;
    private final CompletableFuture<Void> doneFuture = new CompletableFuture();
    private static final Logger LOGGER = Logger.getLogger(SafeStreamObserver.class.getName());

    private SafeStreamObserver(StreamObserver<? super T> streamObserver) {
        this.delegate = streamObserver;
    }

    public void onNext(T t) {
        if (this.done) {
            return;
        }
        if (t == null) {
            this.onError((Throwable)Status.INVALID_ARGUMENT.withDescription("onNext called with null. Null values are generally not allowed.").asRuntimeException());
        } else {
            try {
                this.delegate.onNext(t);
            }
            catch (Throwable thrown) {
                SafeStreamObserver.throwIfFatal(thrown);
                this.onError(thrown);
            }
        }
    }

    public void onError(Throwable thrown) {
        block4: {
            try {
                if (this.done) {
                    LOGGER.log(Level.SEVERE, this.checkNotNull(thrown), () -> "OnError called after StreamObserver was closed");
                } else {
                    this.setDone(thrown);
                    this.delegate.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(this.checkNotNull(thrown)));
                }
            }
            catch (Throwable t) {
                SafeStreamObserver.throwIfFatal(t);
                if (this.isSocketClosedError(thrown)) break block4;
                LOGGER.log(Level.SEVERE, t, () -> "Caught exception handling onError");
            }
        }
    }

    public void onCompleted() {
        block4: {
            if (this.done) {
                LOGGER.log(Level.WARNING, "onComplete called after StreamObserver was closed");
            } else {
                try {
                    this.setDone(null);
                    this.delegate.onCompleted();
                }
                catch (Throwable thrown) {
                    SafeStreamObserver.throwIfFatal(thrown);
                    if (this.isSocketClosedError(thrown)) break block4;
                    LOGGER.log(Level.SEVERE, thrown, () -> "Caught exception handling onComplete");
                }
            }
        }
    }

    public StreamObserver<? super T> delegate() {
        return this.delegate;
    }

    public boolean isDone() {
        return this.done;
    }

    public CompletableFuture<Void> whenDone() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.doneFuture.whenComplete((v, err) -> {
            if (err == null) {
                future.complete(null);
            } else {
                future.completeExceptionally((Throwable)err);
            }
        });
        return future;
    }

    private void setDone(Throwable t) {
        this.done = true;
        if (t == null) {
            this.doneFuture.complete(null);
        } else {
            this.doneFuture.completeExceptionally(t);
        }
    }

    private Throwable checkNotNull(Throwable thrown) {
        if (thrown == null) {
            thrown = Status.INVALID_ARGUMENT.withDescription("onError called with null Throwable. Null exceptions are generally not allowed.").asRuntimeException();
        }
        return thrown;
    }

    private static void throwIfFatal(Throwable thrown) {
        if (thrown instanceof VirtualMachineError) {
            throw (VirtualMachineError)thrown;
        }
        if (thrown instanceof LinkageError) {
            throw (LinkageError)thrown;
        }
    }

    private boolean isSocketClosedError(Throwable throwable) {
        for (Throwable cause = throwable; cause != null; cause = cause.getCause()) {
            if (!(cause instanceof SocketException) || !"Socket closed".equals(cause.getMessage())) continue;
            return true;
        }
        return false;
    }

    public static <T> StreamObserver<T> ensureSafeObserver(StreamObserver<T> observer) {
        if (observer instanceof SafeStreamObserver) {
            return observer;
        }
        return new SafeStreamObserver<T>(observer);
    }
}

