package org.apache.doris.mysql;

import java.io.IOException;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.ConnectScheduler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.xnio.ChannelListener;
import org.xnio.Options;
import org.xnio.StreamConnection;
import org.xnio.channels.AcceptingChannel;

/* loaded from: input_file:org/apache/doris/mysql/AcceptListener.class */
public class AcceptListener implements ChannelListener<AcceptingChannel<StreamConnection>> {
    private static final Logger LOG = LogManager.getLogger(AcceptListener.class);
    private final ConnectScheduler connectScheduler;

    /* loaded from: input_file:org/apache/doris/mysql/AcceptListener$AfterConnectedException.class */
    private static class AfterConnectedException extends Exception {
        public AfterConnectedException(String str) {
            super(str);
        }
    }

    public AcceptListener(ConnectScheduler connectScheduler) {
        this.connectScheduler = connectScheduler;
    }

    public void handleEvent(AcceptingChannel<StreamConnection> acceptingChannel) {
        try {
            StreamConnection accept = acceptingChannel.accept();
            if (accept == null) {
                return;
            }
            accept.setOption(Options.KEEP_ALIVE, true);
            LOG.debug("Connection established. remote={}", accept.getPeerAddress());
            ConnectContext connectContext = new ConnectContext(accept);
            connectContext.setEnv(Env.getCurrentEnv());
            this.connectScheduler.submit(connectContext);
            acceptingChannel.getWorker().execute(() -> {
                try {
                    connectContext.setThreadLocalInfo();
                    connectContext.setConnectScheduler(this.connectScheduler);
                    if (!MysqlProto.negotiate(connectContext)) {
                        throw new AfterConnectedException("mysql negotiate failed");
                    }
                    if (!this.connectScheduler.registerConnection(connectContext)) {
                        connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, "Reach limit of connections");
                        MysqlProto.sendResponsePacket(connectContext);
                        throw new AfterConnectedException("Reach limit of connections");
                    }
                    MysqlProto.sendResponsePacket(connectContext);
                    accept.setCloseListener(streamConnection -> {
                        this.connectScheduler.unregisterConnection(connectContext);
                    });
                    connectContext.setStartTime();
                    connectContext.setUserQueryTimeout(connectContext.getEnv().getAuth().getQueryTimeout(connectContext.getQualifiedUser()));
                    connectContext.setUserInsertTimeout(connectContext.getEnv().getAuth().getInsertTimeout(connectContext.getQualifiedUser()));
                    connectContext.startAcceptQuery(new ConnectProcessor(connectContext));
                } catch (AfterConnectedException e) {
                    connectContext.cleanup();
                } catch (Throwable th) {
                    if (connectContext.getCurrentUserIdentity() != null) {
                        LOG.warn("connect processor exception because ", th);
                    } else if (th instanceof Error) {
                        LOG.error("connect processor exception because ", th);
                    } else {
                        LOG.debug("connect processor exception because ", th);
                    }
                    connectContext.cleanup();
                } finally {
                    ConnectContext.remove();
                }
            });
        } catch (IOException e) {
            LOG.warn("Connection accept failed.", e);
        }
    }
}
