/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.proxy.common;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.grpc.ErrorsHelper;
import com.oracle.coherence.grpc.GrpcService;
import com.oracle.coherence.grpc.GrpcServiceProtocol;
import com.oracle.coherence.grpc.LockingStreamObserver;
import com.oracle.coherence.grpc.LoggingStreamObserver;
import com.oracle.coherence.grpc.SafeStreamObserver;
import com.oracle.coherence.grpc.WrapperMemberIdentity;
import com.oracle.coherence.grpc.messages.common.v1.Complete;
import com.oracle.coherence.grpc.messages.common.v1.ErrorMessage;
import com.oracle.coherence.grpc.messages.common.v1.HeartbeatMessage;
import com.oracle.coherence.grpc.messages.proxy.v1.InitRequest;
import com.oracle.coherence.grpc.messages.proxy.v1.InitResponse;
import com.oracle.coherence.grpc.messages.proxy.v1.ProxyRequest;
import com.oracle.coherence.grpc.messages.proxy.v1.ProxyResponse;
import com.oracle.coherence.grpc.proxy.common.ProxyServiceInterceptor;
import com.tangosol.coherence.component.util.daemon.queueProcessor.service.peer.acceptor.GrpcAcceptor;
import com.tangosol.coherence.component.util.daemon.queueProcessor.service.peer.acceptor.grpcAcceptor.GrpcConnection;
import com.tangosol.io.Serializer;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.Member;
import com.tangosol.net.MemberIdentity;
import com.tangosol.net.messaging.Protocol;
import com.tangosol.util.SafeClock;
import com.tangosol.util.UUID;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

public class ProxyServiceChannel
implements StreamObserver<ProxyRequest>,
Closeable {
    private static final Lock f_lock = new ReentrantLock();
    private final GrpcService f_service;
    private final StreamObserver<ProxyResponse> f_observer;
    private final Supplier<Member> f_memberSupplier;
    private final SocketAddress f_remoteAddress = ProxyServiceInterceptor.getRemoteAddress();
    private UUID m_clientUUID;
    private static final AtomicInteger f_cClient = new AtomicInteger();
    private GrpcServiceProtocol<Message, Message> m_protocol;
    private Class<? extends Message> m_clzRequest;
    private GrpcConnection m_connection;

    public ProxyServiceChannel(GrpcService service, StreamObserver<ProxyResponse> observer) {
        this(service, observer, null);
    }

    protected ProxyServiceChannel(GrpcService service, StreamObserver<ProxyResponse> observer, Supplier<Member> memberSupplier) {
        this.f_memberSupplier = Objects.requireNonNullElse(memberSupplier, () -> CacheFactory.getCluster().getLocalMember());
        this.f_service = service;
        if (GrpcService.LOG_MESSAGES) {
            observer = new LoggingStreamObserver(observer, "ProxyServiceChannel");
        }
        this.f_observer = SafeStreamObserver.ensureSafeObserver((StreamObserver)LockingStreamObserver.ensureLockingObserver(observer));
        service.addCloseable((Closeable)this);
    }

    public void onNext(ProxyRequest request) {
        block14: {
            if (GrpcService.LOG_MESSAGES) {
                Logger.info((String)("ProxyServiceChannel: onNext() called request=" + String.valueOf(request)));
            }
            try {
                long nId = request.getId();
                ProxyRequest.RequestCase requestCase = request.getRequestCase();
                switch (requestCase) {
                    case INIT: {
                        this.init(nId, request.getInit());
                        break;
                    }
                    case HEARTBEAT: {
                        if (request.getHeartbeat().getAck()) {
                            this.f_observer.onNext((Object)ProxyResponse.newBuilder().setId(nId).setHeartbeat(HeartbeatMessage.getDefaultInstance()).build());
                        }
                        break;
                    }
                    case MESSAGE: {
                        this.assertInit();
                        try {
                            Message message;
                            Any any = request.getMessage();
                            try {
                                message = any.unpack(this.m_clzRequest);
                            }
                            catch (InvalidProtocolBufferException e) {
                                String sMsg = String.format("Failed to unpack protobuf message of type %s expected type %s uri=%s", request.getClass().getSimpleName(), this.m_clzRequest, any.getTypeUrl());
                                throw new IllegalArgumentException(sMsg, e);
                            }
                            ForwardingStreamObserver observer = new ForwardingStreamObserver(nId);
                            this.m_protocol.onRequest(message, SafeStreamObserver.ensureSafeObserver(observer));
                        }
                        catch (Throwable e) {
                            this.sendError(nId, e);
                        }
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException("Unsupported request type: " + String.valueOf(requestCase));
                    }
                }
            }
            catch (Throwable t) {
                this.f_observer.onError((Throwable)Status.INTERNAL.withCause(t).asRuntimeException());
                if (this.m_protocol != null) {
                    this.m_protocol.onError(t);
                }
                if (this.m_connection == null) break block14;
                this.m_connection.close();
            }
        }
    }

    public void onError(Throwable t) {
        if (GrpcService.LOG_MESSAGES) {
            Logger.info((String)("ProxyServiceChannel: onError() called error=" + String.valueOf(t)));
        }
        this.f_observer.onError(t);
        if (this.m_protocol != null) {
            this.m_protocol.onError(t);
        } else {
            ErrorsHelper.logIfNotCancelled((Throwable)t);
        }
        if (this.m_connection != null) {
            this.m_connection.close();
        }
    }

    public void onCompleted() {
        if (GrpcService.LOG_MESSAGES) {
            Logger.info((String)"ProxyServiceChannel: onCompleted() called");
        }
        if (this.m_protocol != null) {
            this.m_protocol.close();
        }
        this.f_service.removeCloseable((Closeable)this);
        if (this.m_connection != null) {
            this.m_connection.close(true, null, false);
        }
    }

    public Serializer getSerializer() {
        return this.m_protocol.getSerializer();
    }

    @Override
    public void close() throws IOException {
        this.onCompleted();
    }

    public GrpcConnection getConnection() {
        return this.m_connection;
    }

    protected StreamObserver<ProxyRequest> async(Executor executor) {
        return new AsyncWrapper(executor, this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void init(long nId, InitRequest request) {
        if (GrpcService.LOG_MESSAGES) {
            Logger.info((String)("ProxyServiceChannel: init() called nId=" + nId + " request=" + String.valueOf(request)));
        }
        f_lock.lock();
        try {
            if (this.m_clientUUID != null) {
                throw new IllegalStateException("The client connection is already initialized");
            }
            String sProtocol = request.getProtocol();
            this.m_protocol = ProxyServiceChannel.loadProtocol(sProtocol).orElseThrow(() -> Status.FAILED_PRECONDITION.withDescription("Failed to load proxy protocol " + sProtocol).asRuntimeException());
            GrpcAcceptor acceptor = this.f_service.getGrpcAcceptor();
            this.m_connection = acceptor.openConnection();
            this.m_connection.setStreamObserver(new ForwardingStreamObserver(0L));
            Protocol[] extendProtocols = this.m_protocol.getExtendProtocols();
            HashMap<String, Protocol.MessageFactory> mapFactory = new HashMap<String, Protocol.MessageFactory>();
            if (extendProtocols != null) {
                for (Protocol extendProtocol : extendProtocols) {
                    acceptor.registerGrpcProtocol(extendProtocol);
                    mapFactory.put(extendProtocol.getName(), extendProtocol.getMessageFactory(extendProtocol.getCurrentVersion()));
                }
            }
            this.m_connection.setMessageFactoryMap(mapFactory);
            int nVersion = this.getSupportedVersion(request);
            this.m_clzRequest = this.m_protocol.getRequestType();
            com.tangosol.coherence.component.net.Member memberClient = new com.tangosol.coherence.component.net.Member();
            InetAddress address = null;
            int nPort = 0;
            if (this.f_remoteAddress instanceof InetSocketAddress) {
                address = ((InetSocketAddress)this.f_remoteAddress).getAddress();
                nPort = ((InetSocketAddress)this.f_remoteAddress).getPort();
            }
            memberClient.configure((MemberIdentity)new WrapperMemberIdentity(request.getIdentity()), address, nPort);
            this.m_connection.setMember((Member)memberClient);
            this.m_clientUUID = memberClient.getUuid();
            long nIdObserver = this.m_protocol.getObserverId(nId, (Message)request);
            ForwardingStreamObserver observer = new ForwardingStreamObserver(nIdObserver);
            this.m_protocol.init(this.f_service, request, nVersion, this.m_clientUUID, observer, this.m_connection);
            Member member = this.f_memberSupplier.get();
            InitResponse response = InitResponse.newBuilder().setVersion(CacheFactory.VERSION).setEncodedVersion(CacheFactory.VERSION_ENCODED).setProtocolVersion(nVersion).setUuid(ByteString.copyFrom((byte[])this.m_clientUUID.toByteArray())).setProxyMemberId(member.getId()).setProxyMemberUuid(ByteString.copyFrom((byte[])member.getUuid().toByteArray())).build();
            this.f_observer.onNext((Object)ProxyResponse.newBuilder().setId(nId).setInit(response).build());
        }
        finally {
            f_lock.unlock();
        }
    }

    private UUID createClientUUID() {
        SocketAddress socketAddress = this.f_remoteAddress;
        if (socketAddress instanceof InetSocketAddress) {
            InetSocketAddress inetSocketAddress = (InetSocketAddress)socketAddress;
            return new UUID(SafeClock.INSTANCE.getSafeTimeMillis(), inetSocketAddress.getAddress(), inetSocketAddress.getPort(), f_cClient.incrementAndGet());
        }
        return new UUID();
    }

    private int getSupportedVersion(InitRequest request) {
        int nVersion;
        String sProtocol = request.getProtocol();
        int nVersionClient = request.getProtocolVersion();
        int nMinVersionClient = request.getSupportedProtocolVersion();
        int nVersionThis = this.m_protocol.getVersion();
        int nMinVersionThis = this.m_protocol.getSupportedVersion();
        if (nVersionThis == nVersionClient) {
            nVersion = nVersionThis;
        } else if (nVersionThis > nVersionClient && nVersionClient >= nMinVersionThis) {
            nVersion = nVersionClient;
        } else if (nVersionClient > nVersionThis && nVersionThis >= nMinVersionClient) {
            nVersion = nVersionThis;
        } else {
            throw Status.FAILED_PRECONDITION.withDescription("Cannot support protocol version for " + sProtocol + " requested=" + nMinVersionClient + ".." + nVersionClient + " supported=" + nMinVersionThis + ".." + nVersionThis).asRuntimeException();
        }
        return nVersion;
    }

    protected void assertInit() {
        if (this.m_clientUUID == null) {
            throw new IllegalStateException("The client connection has not been initialized");
        }
    }

    protected void sendError(long nId, Throwable thrown) {
        Serializer serializer = this.m_protocol.getSerializer();
        ErrorMessage message = ErrorsHelper.createErrorMessage((Throwable)thrown, (Serializer)serializer);
        this.f_observer.onNext((Object)ProxyResponse.newBuilder().setId(nId).setError(message).build());
    }

    protected static Optional<GrpcServiceProtocol> loadProtocol(String sProtocol) {
        return ServiceLoader.load(GrpcServiceProtocol.class).stream().map(ServiceLoader.Provider::get).filter(s -> sProtocol.equalsIgnoreCase(s.getProtocol())).max(Comparator.comparingInt(GrpcServiceProtocol::getPriority)).stream().findFirst();
    }

    protected class ForwardingStreamObserver<Resp extends Message>
    implements StreamObserver<Resp> {
        private final long m_nId;

        public ForwardingStreamObserver(long nId) {
            this.m_nId = nId;
        }

        public void onNext(Resp response) {
            ProxyServiceChannel.this.f_observer.onNext((Object)ProxyResponse.newBuilder().setId(this.m_nId).setMessage(Any.pack(response)).build());
        }

        public void onError(Throwable t) {
            ProxyServiceChannel.this.sendError(this.m_nId, t);
        }

        public void onCompleted() {
            ProxyServiceChannel.this.f_observer.onNext((Object)ProxyResponse.newBuilder().setId(this.m_nId).setComplete(Complete.getDefaultInstance()).build());
        }
    }

    public static class AsyncWrapper
    implements StreamObserver<ProxyRequest> {
        private final Executor f_executor;
        private final ProxyServiceChannel f_wrapped;

        public AsyncWrapper(Executor executor, ProxyServiceChannel channel) {
            this.f_executor = executor;
            this.f_wrapped = channel;
        }

        public ProxyServiceChannel getWrapped() {
            return this.f_wrapped;
        }

        public void onNext(ProxyRequest request) {
            this.f_executor.execute(() -> this.f_wrapped.onNext(request));
        }

        public void onError(Throwable t) {
            this.f_wrapped.onError(t);
        }

        public void onCompleted() {
            this.f_wrapped.onCompleted();
        }
    }
}

