/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.client.common.v1;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.oracle.coherence.common.base.Blocking;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.base.Predicate;
import com.oracle.coherence.common.base.Timeout;
import com.oracle.coherence.common.collections.ConcurrentHashMap;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.ErrorsHelper;
import com.oracle.coherence.grpc.LockingStreamObserver;
import com.oracle.coherence.grpc.SafeStreamObserver;
import com.oracle.coherence.grpc.client.common.BaseGrpcConnection;
import com.oracle.coherence.grpc.client.common.GrpcConnection;
import com.oracle.coherence.grpc.messages.common.v1.ErrorMessage;
import com.oracle.coherence.grpc.messages.common.v1.HeartbeatMessage;
import com.oracle.coherence.grpc.messages.proxy.v1.InitRequest;
import com.oracle.coherence.grpc.messages.proxy.v1.InitResponse;
import com.oracle.coherence.grpc.messages.proxy.v1.ProxyRequest;
import com.oracle.coherence.grpc.messages.proxy.v1.ProxyResponse;
import com.oracle.coherence.grpc.services.proxy.v1.ProxyServiceGrpc;
import com.tangosol.internal.net.grpc.RemoteGrpcServiceDependencies;
import com.tangosol.internal.util.Daemons;
import com.tangosol.io.Serializer;
import com.tangosol.net.RequestIncompleteException;
import com.tangosol.util.SafeClock;
import com.tangosol.util.UUID;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.net.ConnectException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class GrpcConnectionV1
extends BaseGrpcConnection
implements GrpcConnection,
StreamObserver<ProxyResponse> {
    public static final int SERVICE_VERSION = 1;
    private final Lock f_lock = new ReentrantLock();
    private final Lock f_observerLock = new ReentrantLock();
    private final Class<? extends Message> f_responseType;
    private final List<GrpcConnection.Listener<?>> m_listeners = new CopyOnWriteArrayList();
    private final GrpcConnection.Dependencies m_dependencies;
    private final String f_sScope;
    private final Serializer m_serializer;
    private final Map<Long, StreamObserver<? extends Message>> m_mapFuture = new ConcurrentHashMap();
    private final AtomicLong m_nMessageId = new AtomicLong(1L);
    private LockingStreamObserver<ProxyRequest> m_observer;
    private CompletableFuture<ProxyResponse> m_connectFuture;
    private final String m_sProtocol;
    private InitResponse m_initResponse;
    private UUID m_uuid;
    private volatile boolean m_fActive = true;
    private volatile boolean m_closed = false;
    private Channel m_channel;
    private final long f_requestTimeout;
    private final long f_nHeartbeatInterval;
    private boolean f_fHeartbeatAck;
    private final LongAdder f_cHeartbeatAck = new LongAdder();
    private final LongAdder f_cHeartbeat = new LongAdder();
    private long m_nLastHeartbeatTime;

    public GrpcConnectionV1(GrpcConnection.Dependencies dependencies, Class<? extends Message> type) {
        RemoteGrpcServiceDependencies serviceDependencies = dependencies.getServiceDependencies();
        this.f_responseType = Objects.requireNonNull(type);
        this.m_serializer = dependencies.getSerializer();
        this.m_dependencies = dependencies;
        this.f_sScope = Objects.requireNonNullElse(serviceDependencies.getRemoteScopeName(), "");
        this.m_channel = dependencies.getChannel();
        this.m_sProtocol = dependencies.getProtocolName();
        this.f_requestTimeout = serviceDependencies.getRequestTimeoutMillis();
        this.f_nHeartbeatInterval = serviceDependencies.getHeartbeatInterval();
        this.f_fHeartbeatAck = serviceDependencies.isRequireHeartbeatAck();
    }

    @Override
    public void connect() {
        LockingStreamObserver<ProxyRequest> observer = this.m_observer;
        if (observer != null && !observer.isDone()) {
            throw new IllegalStateException("Already initialized");
        }
        this.ensureConnected();
        this.dispatchConnected();
    }

    @Override
    public boolean isConnected() {
        return this.m_observer != null && !this.m_observer.isDone();
    }

    @Override
    public void close() {
        this.m_fActive = false;
        this.closeInternal(null);
    }

    public void closeInternal(Throwable closeWithError) {
        Throwable error = closeWithError == null ? new RequestIncompleteException("Channel was closed") : closeWithError;
        this.m_mapFuture.values().forEach(f -> f.onError(error));
        if (!this.m_closed) {
            this.f_lock.lock();
            try {
                if (!this.m_closed) {
                    this.m_closed = true;
                    this.m_mapFuture.values().forEach(f -> f.onError(error));
                    this.m_mapFuture.clear();
                    if (this.m_observer != null && !this.m_observer.isDone()) {
                        this.m_observer.onCompleted();
                    }
                    if (!this.m_mapFuture.isEmpty()) {
                        this.m_mapFuture.values().forEach(f -> f.onError((Throwable)new RequestIncompleteException("channel closed")));
                        this.m_mapFuture.clear();
                    }
                    this.dispatchDisconnected();
                    this.m_listeners.clear();
                    this.m_observer = null;
                    this.m_initResponse = null;
                    this.m_uuid = null;
                }
            }
            finally {
                this.f_lock.unlock();
            }
        }
    }

    @Override
    public Channel getChannel() {
        return this.m_channel;
    }

    @Override
    public UUID getUUID() {
        this.assertInit();
        return this.m_uuid;
    }

    @Override
    public String getProxyVersion() {
        this.assertInit();
        return this.m_initResponse.getVersion();
    }

    @Override
    public int getProxyVersionEncoded() {
        this.assertInit();
        return this.m_initResponse.getEncodedVersion();
    }

    @Override
    public int getProtocolVersion() {
        this.assertInit();
        return this.m_initResponse.getProtocolVersion();
    }

    @Override
    public <T extends Message> T send(Message message) {
        return this.send(message, this.ensureConnected());
    }

    @Override
    public <T extends Message> CompletableFuture<T> poll(Message message) {
        return this.poll(message, this.ensureConnected());
    }

    @Override
    public <T extends Message> void poll(Message message, StreamObserver<T> observer) {
        this.poll(message, observer, this.ensureConnected());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(ProxyResponse response) {
        block20: {
            this.f_observerLock.lock();
            try {
                ProxyResponse.ResponseCase responseCase = response.getResponseCase();
                if (responseCase == ProxyResponse.ResponseCase.HEARTBEAT) {
                    this.f_cHeartbeatAck.increment();
                    return;
                }
                long id = response.getId();
                if (id == 0L) {
                    try {
                        Message message = response.getMessage().unpack(this.f_responseType);
                        this.m_listeners.forEach(listener -> {
                            Predicate predicate = listener.predicate();
                            if (predicate.evaluate((Object)message)) {
                                listener.observer().onNext((Object)message);
                            }
                        });
                    }
                    catch (Exception e) {
                        Logger.err((Throwable)e);
                    }
                    break block20;
                }
                StreamObserver<? extends Message> handler = this.m_mapFuture.get(id);
                if (handler != null) {
                    try {
                        if (responseCase == ProxyResponse.ResponseCase.MESSAGE) {
                            Message message = response.getMessage().unpack(this.f_responseType);
                            handler.onNext((Object)message);
                            break block20;
                        }
                        this.m_mapFuture.remove(id);
                        switch (responseCase) {
                            case INIT: {
                                this.m_initResponse = response.getInit();
                                this.m_uuid = new UUID(this.m_initResponse.getUuid().toByteArray());
                                handler.onNext((Object)this.m_initResponse);
                                handler.onCompleted();
                                break;
                            }
                            case ERROR: {
                                ErrorMessage error = response.getError();
                                Throwable cause = null;
                                if (error.hasError()) {
                                    cause = (Throwable)BinaryHelper.fromByteString((ByteString)error.getError(), (Serializer)this.m_serializer);
                                }
                                try {
                                    throw new RequestIncompleteException(error.getMessage(), cause);
                                }
                                catch (Throwable t) {
                                    handler.onError(t);
                                    break;
                                }
                            }
                            case COMPLETE: {
                                handler.onCompleted();
                                break;
                            }
                            default: {
                                handler.onError((Throwable)new RequestIncompleteException("Unexpected response case: " + String.valueOf(responseCase)));
                            }
                        }
                    }
                    catch (Exception e) {
                        handler.onError((Throwable)e);
                    }
                    break block20;
                }
                Logger.err((String)("Failed to find handler for response: " + id));
            }
            finally {
                this.f_observerLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onError(Throwable t) {
        this.f_observerLock.lock();
        try {
            if (this.m_connectFuture != null) {
                this.m_connectFuture.completeExceptionally(t);
            }
            if (!this.m_closed) {
                ErrorsHelper.logIfNotCancelled((Throwable)t);
                this.closeInternal(t);
            } else {
                Status.Code code = null;
                if (t instanceof StatusRuntimeException) {
                    StatusRuntimeException sre = (StatusRuntimeException)t;
                    code = sre.getStatus().getCode();
                } else if (t instanceof StatusException) {
                    StatusException se = (StatusException)t;
                    code = se.getStatus().getCode();
                }
                if (code != Status.Code.UNAVAILABLE) {
                    Logger.err((String)"onError called after close() has been called", (Throwable)t);
                }
            }
        }
        finally {
            this.f_observerLock.unlock();
        }
    }

    public void onCompleted() {
        this.f_observerLock.lock();
        try {
            this.closeInternal(null);
        }
        finally {
            this.f_observerLock.unlock();
        }
    }

    @Override
    public <T extends Message> void addResponseObserver(GrpcConnection.Listener<T> listener) {
        this.m_listeners.add(listener);
    }

    @Override
    public <T extends Message> void removeResponseObserver(GrpcConnection.Listener<T> listener) {
        this.m_listeners.remove(listener);
    }

    @Override
    public long getHeartbeatsSent() {
        return this.f_cHeartbeat.sum();
    }

    @Override
    public long getLastHeartbeatTime() {
        return this.m_nLastHeartbeatTime;
    }

    @Override
    public long getHeartbeatsAcked() {
        return this.f_cHeartbeatAck.sum();
    }

    public String toString() {
        if (this.m_closed) {
            return "GrpcConnectionV1(Closed)";
        }
        if (this.m_initResponse == null) {
            return "GrpcConnectionV1(Not Initialized)";
        }
        return "GrpcConnectionV1(scope=\"" + this.f_sScope + "\" , protocol=\"" + this.m_sProtocol + "\" , version=" + this.m_initResponse.getProtocolVersion() + ")";
    }

    private <T extends Message> T send(Message message, LockingStreamObserver<ProxyRequest> observer) {
        CompletableFuture<T> future = this.poll(message, observer);
        return this.awaitFuture(future, this.f_requestTimeout);
    }

    private <T extends Message> T awaitFuture(CompletableFuture<T> future, long nMillis) {
        try {
            if (nMillis > 0L) {
                return (T)((Message)future.get(nMillis, TimeUnit.MILLISECONDS));
            }
            return (T)((Message)future.get());
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (!(cause instanceof StatusRuntimeException)) {
                Logger.err((Throwable)cause);
            }
            throw new RequestIncompleteException(e.getMessage(), (Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            throw new RequestIncompleteException(e.getMessage(), (Throwable)e);
        }
    }

    private <T extends Message> CompletableFuture<T> poll(Message message, LockingStreamObserver<ProxyRequest> sender) {
        ResponseHandler observer = new ResponseHandler();
        this.poll(message, observer, sender);
        return observer.getFuture();
    }

    private <T extends Message> void poll(Message message, StreamObserver<T> observer, LockingStreamObserver<ProxyRequest> sender) {
        try {
            long nId = this.m_nMessageId.incrementAndGet();
            ProxyRequest.Builder builder = ProxyRequest.newBuilder().setId(nId);
            if (message instanceof InitRequest) {
                builder.setInit((InitRequest)message);
            } else if (message instanceof HeartbeatMessage) {
                builder.setHeartbeat((HeartbeatMessage)message);
            } else {
                builder.setMessage(Any.pack((Message)message));
            }
            this.m_mapFuture.put(nId, observer);
            ProxyRequest request = builder.build();
            sender.onNext((Object)request);
        }
        catch (Exception e) {
            observer.onError((Throwable)e);
        }
    }

    protected void assertActive() {
        if (!this.m_fActive) {
            throw new IllegalStateException("This connection has been closed");
        }
    }

    protected LockingStreamObserver<ProxyRequest> ensureConnected() {
        this.assertActive();
        LockingStreamObserver observer = this.m_observer;
        if (observer == null) {
            this.f_lock.lock();
            try (Timeout ignored = Timeout.after((long)this.f_requestTimeout);){
                observer = this.m_observer;
                while (observer == null) {
                    ProxyServiceGrpc.ProxyServiceStub stub = this.createStub(this.m_channel);
                    observer = LockingStreamObserver.ensureLockingObserver((StreamObserver)SafeStreamObserver.ensureSafeObserver((StreamObserver)stub.subChannel((StreamObserver)this)));
                    InitRequest request = InitRequest.newBuilder().setScope(this.f_sScope).setFormat(this.m_serializer.getName()).setProtocol(this.m_sProtocol).setProtocolVersion(this.m_dependencies.getVersion()).setSupportedProtocolVersion(this.m_dependencies.getSupportedVersion()).build();
                    ResponseHandler handler = new ResponseHandler();
                    long nMillis = this.f_requestTimeout <= 0L ? 30000L : this.f_requestTimeout;
                    this.m_connectFuture = handler.getFuture();
                    try {
                        this.poll((Message)request, handler, (LockingStreamObserver<ProxyRequest>)observer);
                        this.awaitFuture(this.m_connectFuture, nMillis);
                    }
                    catch (Exception e) {
                        Throwable rootCause = Exceptions.getRootCause((Throwable)e);
                        if (rootCause instanceof ConnectException) {
                            try {
                                Blocking.sleep((long)1000L);
                            }
                            catch (InterruptedException interruptedException) {}
                            continue;
                        }
                        throw Exceptions.ensureRuntimeException((Throwable)e);
                    }
                    this.m_connectFuture = null;
                    this.m_observer = observer;
                    if (this.f_nHeartbeatInterval <= 0L) continue;
                    Daemons.commonPool().schedule((Runnable)new HeartbeatTask(this.f_fHeartbeatAck), this.f_nHeartbeatInterval);
                }
            }
            catch (InterruptedException e) {
                throw Exceptions.ensureRuntimeException((Throwable)e, (String)"Timed out reconnecting");
            }
            finally {
                this.f_lock.unlock();
            }
        }
        return observer;
    }

    protected void assertInit() {
        this.assertActive();
        if (this.m_initResponse == null) {
            throw new IllegalStateException("Connection has not been intialized");
        }
    }

    private ProxyServiceGrpc.ProxyServiceStub createStub(Channel channel) {
        return this.createStubWithoutDeadline(channel);
    }

    private ProxyServiceGrpc.ProxyServiceStub createStubWithDeadline(Channel channel, long nDeadline) {
        if (nDeadline == -1L) {
            return this.createStubWithoutDeadline(channel);
        }
        if (nDeadline <= 0L) {
            return this.createStub(channel);
        }
        return (ProxyServiceGrpc.ProxyServiceStub)this.createStubWithoutDeadline(channel).withDeadlineAfter(nDeadline, TimeUnit.MILLISECONDS);
    }

    private ProxyServiceGrpc.ProxyServiceStub createStubWithoutDeadline(Channel channel) {
        return ProxyServiceGrpc.newStub((Channel)channel);
    }

    private static class ResponseHandler<T extends Message>
    implements StreamObserver<Message> {
        private final CompletableFuture<T> f_future = new CompletableFuture();
        private T m_value;

        private ResponseHandler() {
        }

        public void onNext(Message value) {
            this.m_value = value;
        }

        public void onError(Throwable t) {
            this.f_future.completeExceptionally(t);
        }

        public void onCompleted() {
            this.f_future.complete(this.m_value);
        }

        private CompletableFuture<T> getFuture() {
            return this.f_future;
        }
    }

    protected class HeartbeatTask
    implements Runnable {
        private final HeartbeatMessage f_message;

        public HeartbeatTask(boolean fAck) {
            ByteString bytes = GrpcConnectionV1.this.m_uuid == null ? ByteString.EMPTY : ByteString.copyFrom((byte[])GrpcConnectionV1.this.m_uuid.toByteArray());
            this.f_message = HeartbeatMessage.newBuilder().setUuid(bytes).setAck(fAck).build();
        }

        @Override
        public void run() {
            if (GrpcConnectionV1.this.isConnected()) {
                GrpcConnectionV1.this.m_nLastHeartbeatTime = SafeClock.INSTANCE.getSafeTimeMillis();
                GrpcConnectionV1.this.f_cHeartbeat.increment();
                GrpcConnectionV1.this.poll((Message)this.f_message);
                Daemons.commonPool().schedule((Runnable)this, GrpcConnectionV1.this.f_nHeartbeatInterval);
            }
        }
    }
}

