/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.service.network;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.paimon.service.network.ClientHandler;
import org.apache.paimon.service.network.ClientHandlerCallback;
import org.apache.paimon.service.network.messages.MessageBody;
import org.apache.paimon.service.network.messages.MessageSerializer;
import org.apache.paimon.service.network.stats.ServiceRequestStats;
import org.apache.paimon.shade.netty4.io.netty.buffer.ByteBuf;
import org.apache.paimon.shade.netty4.io.netty.channel.Channel;
import org.apache.paimon.shade.netty4.io.netty.channel.ChannelFuture;
import org.apache.paimon.shade.netty4.io.netty.util.concurrent.Future;
import org.apache.paimon.shade.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.paimon.utils.FutureUtils;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> {
    private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);
    private final Object connectionLock;
    @GuardedBy(value="connectionLock")
    private InternalConnection<REQ, RESP> internalConnection;
    @GuardedBy(value="connectionLock")
    private boolean running = true;
    private final CompletableFuture<Void> closeFuture = new CompletableFuture();

    private ServerConnection(Object lock, InternalConnection<REQ, RESP> internalConnection) {
        this.connectionLock = lock;
        this.internalConnection = internalConnection;
        this.forwardCloseFuture();
    }

    @GuardedBy(value="connectionLock")
    private void forwardCloseFuture() {
        InternalConnection<REQ, RESP> currentConnection = this.internalConnection;
        currentConnection.getCloseFuture().whenComplete((unused, throwable) -> {
            Object object = this.connectionLock;
            synchronized (object) {
                if (this.internalConnection == currentConnection) {
                    if (throwable != null) {
                        this.closeFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        this.closeFuture.complete(null);
                    }
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CompletableFuture<RESP> sendRequest(REQ request) {
        Object object = this.connectionLock;
        synchronized (object) {
            Preconditions.checkState(this.running, "Connection has already been closed.");
            return this.internalConnection.sendRequest(request);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void establishConnection(ChannelFuture future) {
        Object object = this.connectionLock;
        synchronized (object) {
            Preconditions.checkState(this.running, "Connection has already been closed.");
            this.internalConnection = this.internalConnection.establishConnection(future);
            this.forwardCloseFuture();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CompletableFuture<Void> close() {
        Object object = this.connectionLock;
        synchronized (object) {
            if (this.running) {
                this.running = false;
                this.internalConnection.close();
            }
            return this.closeFuture;
        }
    }

    CompletableFuture<Void> getCloseFuture() {
        return this.closeFuture;
    }

    static <REQ extends MessageBody, RESP extends MessageBody> ServerConnection<REQ, RESP> createPendingConnection(String clientName, MessageSerializer<REQ, RESP> serializer, ServiceRequestStats stats) {
        Object lock = new Object();
        return new ServerConnection(lock, new PendingConnection(channel -> new EstablishedConnection(lock, clientName, serializer, (Channel)channel, stats)));
    }

    private static class EstablishedConnection<REQ extends MessageBody, RESP extends MessageBody>
    implements ClientHandlerCallback<RESP>,
    InternalConnection<REQ, RESP> {
        private final Object lock;
        private final Channel channel;
        private final ServiceRequestStats stats;
        private final ConcurrentHashMap<Long, TimestampedCompletableFuture<RESP>> pendingRequests = new ConcurrentHashMap();
        private final CompletableFuture<Void> closeFuture = new CompletableFuture();
        @GuardedBy(value="lock")
        private long requestCount = 0L;
        @GuardedBy(value="lock")
        private boolean running = true;

        EstablishedConnection(Object lock, String clientName, MessageSerializer<REQ, RESP> serializer, Channel channel, ServiceRequestStats stats) {
            this.lock = lock;
            this.channel = Preconditions.checkNotNull(channel);
            channel.pipeline().addLast(clientName + " Handler", new ClientHandler<REQ, RESP>(serializer, this));
            this.stats = stats;
            stats.reportActiveConnection();
        }

        @Override
        public CompletableFuture<Void> close() {
            return this.close(new ClosedChannelException());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private CompletableFuture<Void> close(Throwable cause) {
            Object object = this.lock;
            synchronized (object) {
                if (this.running) {
                    this.running = false;
                    this.channel.close().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)finished -> {
                        this.stats.reportInactiveConnection();
                        Iterator iterator = ((ConcurrentHashMap.KeySetView)this.pendingRequests.keySet()).iterator();
                        while (iterator.hasNext()) {
                            long requestId = (Long)iterator.next();
                            TimestampedCompletableFuture<RESP> pending = this.pendingRequests.remove(requestId);
                            if (pending == null || !pending.completeExceptionally(cause)) continue;
                            this.stats.reportFailedRequest();
                        }
                        if (finished.isSuccess()) {
                            this.closeFuture.completeExceptionally(cause);
                        } else {
                            LOG.warn("Something went wrong when trying to close connection due to : ", cause);
                            this.closeFuture.completeExceptionally(finished.cause());
                        }
                    }));
                }
            }
            return this.closeFuture;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public CompletableFuture<RESP> sendRequest(REQ request) {
            Object object = this.lock;
            synchronized (object) {
                if (this.running) {
                    TimestampedCompletableFuture requestPromiseTs = new TimestampedCompletableFuture(System.nanoTime());
                    try {
                        long requestId = this.requestCount++;
                        this.pendingRequests.put(requestId, requestPromiseTs);
                        this.stats.reportRequest();
                        ByteBuf buf = MessageSerializer.serializeRequest(this.channel.alloc(), requestId, request);
                        this.channel.writeAndFlush(buf).addListener(future -> {
                            TimestampedCompletableFuture<RESP> pending;
                            if (!future.isSuccess() && (pending = this.pendingRequests.remove(requestId)) != null && pending.completeExceptionally(future.cause())) {
                                this.stats.reportFailedRequest();
                            }
                        });
                    }
                    catch (Throwable t) {
                        requestPromiseTs.completeExceptionally(t);
                    }
                    return requestPromiseTs;
                }
                return FutureUtils.completedExceptionally(new ClosedChannelException());
            }
        }

        @Override
        public InternalConnection<REQ, RESP> establishConnection(ChannelFuture future) {
            throw new IllegalStateException("The connection is already established.");
        }

        @Override
        public boolean isEstablished() {
            return true;
        }

        @Override
        public CompletableFuture<Void> getCloseFuture() {
            return this.closeFuture;
        }

        @Override
        public void onRequestResult(long requestId, RESP response) {
            TimestampedCompletableFuture<RESP> pending = this.pendingRequests.remove(requestId);
            if (pending != null && !pending.isDone()) {
                long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1000000L;
                this.stats.reportSuccessfulRequest(durationMillis);
                pending.complete(response);
            }
        }

        @Override
        public void onRequestFailure(long requestId, Throwable cause) {
            TimestampedCompletableFuture<RESP> pending = this.pendingRequests.remove(requestId);
            if (pending != null && !pending.isDone()) {
                this.stats.reportFailedRequest();
                pending.completeExceptionally(cause);
            }
        }

        @Override
        public void onFailure(Throwable cause) {
            this.close(cause);
        }

        private static final class TimestampedCompletableFuture<RESP extends MessageBody>
        extends CompletableFuture<RESP> {
            private final long timestampInNanos;

            TimestampedCompletableFuture(long timestampInNanos) {
                this.timestampInNanos = timestampInNanos;
            }

            public long getTimestamp() {
                return this.timestampInNanos;
            }
        }
    }

    private static final class PendingConnection<REQ extends MessageBody, RESP extends MessageBody>
    implements InternalConnection<REQ, RESP> {
        private final Function<Channel, EstablishedConnection<REQ, RESP>> connectionFactory;
        private final CompletableFuture<Void> closeFuture = new CompletableFuture();
        private final ArrayDeque<PendingRequest<REQ, RESP>> queuedRequests = new ArrayDeque();
        @Nullable
        private Throwable failureCause = null;
        private boolean running = true;

        private PendingConnection(Function<Channel, EstablishedConnection<REQ, RESP>> connectionFactory) {
            this.connectionFactory = connectionFactory;
        }

        @Override
        public CompletableFuture<RESP> sendRequest(REQ request) {
            if (this.failureCause != null) {
                return FutureUtils.completedExceptionally(this.failureCause);
            }
            if (!this.running) {
                return FutureUtils.completedExceptionally(new ClosedChannelException());
            }
            PendingRequest pending = new PendingRequest(request);
            this.queuedRequests.add(pending);
            return pending;
        }

        @Override
        public InternalConnection<REQ, RESP> establishConnection(ChannelFuture future) {
            if (future.isSuccess()) {
                return this.createEstablishedConnection(future.channel());
            }
            this.close(future.cause());
            return this;
        }

        @Override
        public boolean isEstablished() {
            return false;
        }

        @Override
        public CompletableFuture<Void> getCloseFuture() {
            return this.closeFuture;
        }

        private InternalConnection<REQ, RESP> createEstablishedConnection(Channel channel) {
            if (this.failureCause != null || !this.running) {
                channel.close();
                return this;
            }
            EstablishedConnection<REQ, RESP> establishedConnection = this.connectionFactory.apply(channel);
            while (!this.queuedRequests.isEmpty()) {
                PendingRequest<REQ, RESP> pending = this.queuedRequests.poll();
                FutureUtils.forward(establishedConnection.sendRequest(pending.getRequest()), pending);
            }
            return establishedConnection;
        }

        @Override
        public CompletableFuture<Void> close() {
            return this.close(new ClosedChannelException());
        }

        private CompletableFuture<Void> close(Throwable cause) {
            if (this.running) {
                this.running = false;
                this.failureCause = cause;
                for (PendingRequest<REQ, RESP> pendingRequest : this.queuedRequests) {
                    pendingRequest.completeExceptionally(cause);
                }
                this.queuedRequests.clear();
                this.closeFuture.completeExceptionally(cause);
            }
            return this.closeFuture;
        }

        private static final class PendingRequest<REQ extends MessageBody, RESP extends MessageBody>
        extends CompletableFuture<RESP> {
            private final REQ request;

            private PendingRequest(REQ request) {
                this.request = request;
            }

            public REQ getRequest() {
                return this.request;
            }
        }
    }

    static interface InternalConnection<REQ, RESP> {
        public CompletableFuture<RESP> sendRequest(REQ var1);

        public InternalConnection<REQ, RESP> establishConnection(ChannelFuture var1);

        public boolean isEstablished();

        public CompletableFuture<Void> getCloseFuture();

        public CompletableFuture<Void> close();
    }
}

