/*
 * Decompiled with CFR 0.152.
 */
package dev.miku.r2dbc.mysql.client;

import dev.miku.r2dbc.mysql.ConnectionContext;
import dev.miku.r2dbc.mysql.MySqlSslConfiguration;
import dev.miku.r2dbc.mysql.client.Client;
import dev.miku.r2dbc.mysql.client.ClientExceptions;
import dev.miku.r2dbc.mysql.client.EnvelopeSlicer;
import dev.miku.r2dbc.mysql.client.Lifecycle;
import dev.miku.r2dbc.mysql.client.MessageDuplexCodec;
import dev.miku.r2dbc.mysql.client.RequestQueue;
import dev.miku.r2dbc.mysql.client.RequestTask;
import dev.miku.r2dbc.mysql.client.SslBridgeHandler;
import dev.miku.r2dbc.mysql.client.SslState;
import dev.miku.r2dbc.mysql.message.client.ClientMessage;
import dev.miku.r2dbc.mysql.message.client.ExitMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.message.server.WarningMessage;
import dev.miku.r2dbc.mysql.util.AssertUtils;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.spi.R2dbcException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
import reactor.util.context.Context;

final class ReactorNettyClient
implements Client {
    private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class);
    private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
    private static final boolean INFO_ENABLED = logger.isInfoEnabled();
    private final Connection connection;
    private final ConnectionContext context;
    private final EmitterProcessor<ClientMessage> requestProcessor = EmitterProcessor.create((boolean)false);
    private final EmitterProcessor<ServerMessage> responseProcessor = EmitterProcessor.create((boolean)false);
    private final RequestQueue requestQueue = new RequestQueue();
    private final AtomicBoolean closing = new AtomicBoolean();

    ReactorNettyClient(Connection connection, MySqlSslConfiguration ssl, ConnectionContext context) {
        AssertUtils.requireNonNull(connection, "connection must not be null");
        AssertUtils.requireNonNull(context, "context must not be null");
        AssertUtils.requireNonNull(ssl, "ssl must not be null");
        this.connection = connection;
        this.context = context;
        connection.addHandlerLast("R2dbcMySqlEnvelopeSlicer", (ChannelHandler)new EnvelopeSlicer()).addHandlerLast("R2dbcMySqlMessageDuplexCodec", (ChannelHandler)new MessageDuplexCodec(context, this.closing, this.requestQueue));
        if (ssl.getSslMode().startSsl()) {
            connection.addHandlerFirst("R2dbcMySqlSslBridgeHandler", (ChannelHandler)new SslBridgeHandler(context, ssl));
        }
        if (InternalLoggerFactory.getInstance(ReactorNettyClient.class).isTraceEnabled()) {
            logger.debug("Connection tracking logging is enabled");
            connection.addHandlerFirst(LoggingHandler.class.getSimpleName(), (ChannelHandler)new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE));
        }
        ResponseSink sink = new ResponseSink();
        connection.inbound().receiveObject().doOnNext(it -> {
            if (it instanceof ServerMessage) {
                if (it instanceof ReferenceCounted) {
                    ((ReferenceCounted)it).retain();
                }
            } else {
                throw ClientExceptions.unsupportedProtocol(it.getClass().getTypeName());
            }
            sink.next((ServerMessage)it);
        }).onErrorResume(this::resumeError).subscribe((CoreSubscriber)new ResponseSubscriber(sink));
        this.requestProcessor.concatMap(message -> {
            if (DEBUG_ENABLED) {
                logger.debug("Request: {}", message);
            }
            return connection.outbound().sendObject(message);
        }).onErrorResume(this::resumeError).doAfterTerminate(this::handleClose).subscribe();
    }

    @Override
    public Flux<ServerMessage> exchange(ClientMessage request, Predicate<ServerMessage> complete) {
        AssertUtils.requireNonNull(request, "request must not be null");
        return Mono.create(sink -> {
            if (!this.isConnected()) {
                if (request instanceof Disposable) {
                    ((Disposable)request).dispose();
                }
                sink.error((Throwable)ClientExceptions.exchangeClosed());
                return;
            }
            boolean[] completed = new boolean[]{false};
            Flux responses = this.responseProcessor.doOnSubscribe(ignored -> this.requestProcessor.onNext((Object)request)).handle((message, response) -> {
                if (complete.test((ServerMessage)message)) {
                    completed[0] = true;
                    response.next(message);
                    response.complete();
                } else {
                    response.next(message);
                }
            }).doOnCancel(() -> ReactorNettyClient.exchangeCancel(completed));
            this.requestQueue.submit(RequestTask.wrap(request, sink, responses));
        }).flatMapMany(ReactorNettyClient.identity()).doAfterTerminate((Runnable)this.requestQueue);
    }

    @Override
    public Flux<ServerMessage> exchange(Flux<? extends ClientMessage> requests, Predicate<ServerMessage> complete) {
        AssertUtils.requireNonNull(requests, "requests must not be null");
        return Mono.create(sink -> {
            if (!this.isConnected()) {
                requests.subscribe(request -> {
                    if (request instanceof Disposable) {
                        ((Disposable)request).dispose();
                    }
                }, arg_0 -> this.requestProcessor.onError(arg_0));
                sink.error((Throwable)ClientExceptions.exchangeClosed());
                return;
            }
            boolean[] completed = new boolean[]{false};
            Flux responses = this.responseProcessor.doOnSubscribe(ignored -> requests.subscribe(request -> {
                if (this.isConnected()) {
                    this.requestProcessor.onNext(request);
                } else if (request instanceof Disposable) {
                    ((Disposable)request).dispose();
                }
            }, arg_0 -> this.requestProcessor.onError(arg_0))).handle((message, response) -> {
                if (complete.test((ServerMessage)message)) {
                    completed[0] = true;
                    response.next(message);
                    response.complete();
                } else {
                    response.next(message);
                }
            }).doOnCancel(() -> ReactorNettyClient.exchangeCancel(completed));
            this.requestQueue.submit(RequestTask.wrap(requests, sink, responses));
        }).flatMapMany(ReactorNettyClient.identity()).doAfterTerminate((Runnable)this.requestQueue);
    }

    @Override
    public Mono<Void> close() {
        return Mono.create(sink -> {
            if (!this.closing.compareAndSet(false, true)) {
                sink.success();
                return;
            }
            this.requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> this.requestProcessor.onNext((Object)ExitMessage.getInstance()))));
        }).flatMap(ReactorNettyClient.identity()).onErrorResume(e -> {
            logger.error("Exit message sending failed, force closing", e);
            return Mono.empty();
        }).then(this.forceClose());
    }

    private <T> Mono<T> resumeError(Throwable e) {
        this.drainError(ClientExceptions.wrap(e));
        this.requestProcessor.onComplete();
        logger.error("Error: {}", (Object)e.getMessage(), (Object)e);
        return this.close();
    }

    @Override
    public Mono<Void> forceClose() {
        return FutureMono.deferFuture(() -> this.connection.channel().close());
    }

    @Override
    public ByteBufAllocator getByteBufAllocator() {
        return this.connection.outbound().alloc();
    }

    @Override
    public boolean isConnected() {
        return !this.closing.get() && this.connection.channel().isOpen();
    }

    @Override
    public void sslUnsupported() {
        this.connection.channel().pipeline().fireUserEventTriggered((Object)SslState.UNSUPPORTED);
    }

    @Override
    public void loginSuccess() {
        this.connection.channel().pipeline().fireUserEventTriggered((Object)Lifecycle.COMMAND);
    }

    public String toString() {
        return String.format("ReactorNettyClient(%s){connectionId=%d}", this.closing.get() ? "closing or closed" : "activating", this.context.getConnectionId());
    }

    private void drainError(R2dbcException e) {
        this.requestQueue.dispose();
        this.responseProcessor.onError((Throwable)e);
    }

    private void handleClose() {
        if (this.closing.compareAndSet(false, true)) {
            logger.warn("Connection has been closed by peer");
            this.drainError(ClientExceptions.unexpectedClosed());
        } else {
            this.drainError(ClientExceptions.expectedClosed());
        }
    }

    private static void exchangeCancel(boolean[] completed) {
        if (!completed[0]) {
            logger.error("Exchange cancelled while exchange is active. This is likely a bug leading to unpredictable outcome.");
        }
    }

    private static <T> Function<T, T> identity() {
        return Identity.INSTANCE;
    }

    private static final class Identity
    implements Function<Object, Object> {
        private static final Identity INSTANCE = new Identity();

        private Identity() {
        }

        @Override
        public Object apply(Object o) {
            return o;
        }
    }

    private final class ResponseSink
    implements SynchronousSink<ServerMessage> {
        private ResponseSink() {
        }

        public void complete() {
            throw new UnsupportedOperationException();
        }

        public Context currentContext() {
            return ReactorNettyClient.this.responseProcessor.currentContext();
        }

        public void error(Throwable e) {
            ReactorNettyClient.this.responseProcessor.onError((Throwable)ClientExceptions.wrap(e));
        }

        public void next(ServerMessage message) {
            if (INFO_ENABLED) {
                if (message instanceof WarningMessage) {
                    int warnings = ((WarningMessage)message).getWarnings();
                    if (warnings != 0) {
                        logger.info("Response: {}, reports {} warning(s)", (Object)message, (Object)warnings);
                    }
                } else if (DEBUG_ENABLED) {
                    logger.debug("Response: {}", (Object)message);
                }
            }
            ReactorNettyClient.this.responseProcessor.onNext((Object)message);
        }
    }

    private final class ResponseSubscriber
    implements CoreSubscriber<Object> {
        private final ResponseSink sink;

        private ResponseSubscriber(ResponseSink sink) {
            this.sink = sink;
        }

        public Context currentContext() {
            return ReactorNettyClient.this.responseProcessor.currentContext();
        }

        public void onSubscribe(Subscription s) {
            ReactorNettyClient.this.responseProcessor.onSubscribe(s);
        }

        public void onNext(Object message) {
        }

        public void onError(Throwable t) {
            this.sink.error(t);
        }

        public void onComplete() {
            ReactorNettyClient.this.handleClose();
        }
    }
}

