/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckPayload;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import java.lang.invoke.MethodHandles;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTLogger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTVersion;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTTProtocolHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private ConnectionEntry connectionEntry;
    private MQTTConnection connection;
    private MQTTSession session;
    private ActiveMQServer server;
    private MQTTProtocolManager protocolManager;
    private ChannelHandlerContext ctx;
    private boolean stopped = false;
    private final Actor<MqttMessage> mqttMessageActor;

    public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
        this.server = server;
        this.protocolManager = protocolManager;
        this.mqttMessageActor = new Actor(server.getThreadPool(), this::act);
    }

    void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
        this.connectionEntry = entry;
        this.connection = connection;
        this.session = new MQTTSession(this, connection, this.protocolManager, this.server.getConfiguration().getWildcardConfiguration());
    }

    void stop() {
        this.stopped = true;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        MqttMessage message = (MqttMessage)msg;
        if (this.stopped) {
            if (this.session.getVersion() == MQTTVersion.MQTT_5) {
                this.sendDisconnect((byte)-125);
            }
            this.disconnect(true);
            return;
        }
        if (message.decoderResult().isFailure()) {
            logger.debug("Disconnecting client due to message decoding failure.", message.decoderResult().cause());
            if (this.session.getVersion() == MQTTVersion.MQTT_5) {
                this.sendDisconnect((byte)-127);
            }
            this.disconnect(true);
            return;
        }
        String interceptResult = this.protocolManager.invokeIncoming(message, this.connection);
        if (interceptResult != null) {
            logger.debug("Interceptor {} rejected MQTT control packet: {}", (Object)interceptResult, (Object)message);
            this.disconnect(true);
            return;
        }
        this.connection.dataReceived();
        if (AuditLogger.isAnyLoggingEnabled()) {
            AuditLogger.setRemoteAddress((String)this.connection.getRemoteAddress());
        }
        MQTTUtil.logMessage(this.session.getState(), message, true, this.session.getVersion());
        if (this.ctx == null) {
            this.ctx = ctx;
        }
        if (MqttMessageType.PINGREQ == message.fixedHeader().messageType()) {
            this.handlePingreq();
        } else {
            this.mqttMessageActor.act((Object)message);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void act(MqttMessage message) {
        try {
            switch (message.fixedHeader().messageType()) {
                case AUTH: {
                    this.handleAuth(message);
                    return;
                }
                case CONNECT: {
                    this.handleConnect((MqttConnectMessage)message);
                    return;
                }
                case PUBLISH: {
                    this.handlePublish((MqttPublishMessage)message);
                    return;
                }
                case PUBACK: {
                    this.handlePuback((MqttPubAckMessage)message);
                    return;
                }
                case PUBREC: {
                    this.handlePubrec(message);
                    return;
                }
                case PUBREL: {
                    this.handlePubrel(message);
                    return;
                }
                case PUBCOMP: {
                    this.handlePubcomp(message);
                    return;
                }
                case SUBSCRIBE: {
                    this.handleSubscribe((MqttSubscribeMessage)message);
                    return;
                }
                case UNSUBSCRIBE: {
                    this.handleUnsubscribe((MqttUnsubscribeMessage)message);
                    return;
                }
                case DISCONNECT: {
                    this.disconnect(false, message);
                    return;
                }
                default: {
                    this.disconnect(true);
                    return;
                }
            }
        }
        catch (Exception e) {
            MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
            if (this.session.getVersion() == MQTTVersion.MQTT_5) {
                this.sendDisconnect((byte)-125);
            }
            this.disconnect(true);
            return;
        }
        finally {
            ReferenceCountUtil.release((Object)message);
        }
    }

    void handleAuth(MqttMessage auth) throws Exception {
        byte[] authenticationData = MQTTUtil.getProperty(byte[].class, ((MqttReasonCodeAndPropertiesVariableHeader)auth.variableHeader()).properties(), MqttProperties.MqttPropertyType.AUTHENTICATION_DATA);
        String authenticationMethod = MQTTUtil.getProperty(String.class, ((MqttReasonCodeAndPropertiesVariableHeader)auth.variableHeader()).properties(), MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD);
        MqttReasonCodeAndPropertiesVariableHeader header = (MqttReasonCodeAndPropertiesVariableHeader)auth.variableHeader();
        if (header.reasonCode() == 25 || header.reasonCode() == 24 || header.reasonCode() == 0) {
            // empty if block
        }
    }

    void handleConnect(MqttConnectMessage connect) throws Exception {
        this.session.setVersion(MQTTVersion.getVersion(connect.variableHeader().version()));
        if (!this.checkClientVersion()) {
            return;
        }
        this.session.getConnection().setClientID(connect.payload().clientIdentifier());
        if (!this.validateClientID(connect.variableHeader().isCleanSession())) {
            return;
        }
        String password = connect.payload().passwordInBytes() == null ? null : new String(connect.payload().passwordInBytes(), CharsetUtil.UTF_8);
        String username = connect.payload().userName();
        Pair<Boolean, String> validationData = null;
        try {
            validationData = this.validateUser(username, password);
            if (!((Boolean)validationData.getA()).booleanValue()) {
                return;
            }
        }
        catch (InvalidClientIdException e) {
            this.handleInvalidClientId();
            return;
        }
        if (this.handleLinkStealing() == LinkStealingResult.NEW_LINK_DENIED) {
            return;
        }
        this.protocolManager.getStateManager().addConnectedClient(this.session.getConnection().getClientID(), this.session.getConnection());
        if (this.connection.getTransportConnection().getRouter() == null || !this.protocolManager.getRoutingHandler().route(this.connection, this.session, connect)) {
            this.calculateKeepAlive(connect);
            this.session.getConnectionManager().connect(connect, (String)validationData.getB(), username, password);
        }
    }

    void disconnect(boolean error) {
        this.disconnect(error, null);
    }

    void disconnect(boolean error, MqttMessage disconnect) {
        Integer sessionExpiryInterval;
        if (disconnect != null && disconnect.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader && (sessionExpiryInterval = (Integer)MQTTUtil.getProperty(Integer.class, ((MqttReasonCodeAndPropertiesVariableHeader)disconnect.variableHeader()).properties(), MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL, null)) != null) {
            this.session.getState().setClientSessionExpiryInterval(sessionExpiryInterval);
        }
        this.session.getConnectionManager().disconnect(error);
    }

    void sendConnack(byte returnCode) {
        this.sendConnack(returnCode, MqttProperties.NO_PROPERTIES);
    }

    void sendConnack(byte returnCode, MqttProperties properties) {
        this.sendConnack(returnCode, true, properties);
    }

    void sendConnack(byte returnCode, boolean sessionPresent, MqttProperties properties) {
        if (returnCode != 0) {
            sessionPresent = false;
        }
        this.sendToClient((MqttMessage)MqttMessageBuilders.connAck().returnCode(MqttConnectReturnCode.valueOf((byte)returnCode)).properties(properties).sessionPresent(sessionPresent).build());
    }

    void sendDisconnect(byte reasonCode) {
        this.sendToClient(MqttMessageBuilders.disconnect().reasonCode(reasonCode).build());
    }

    void handlePublish(MqttPublishMessage message) throws Exception {
        if (this.session.getVersion() == MQTTVersion.MQTT_5 && this.session.getProtocolManager().getMaximumPacketSize() != -1 && MQTTUtil.calculateMessageSize(message) > this.session.getProtocolManager().getMaximumPacketSize()) {
            this.sendDisconnect((byte)-107);
            this.disconnect(true);
            return;
        }
        try {
            this.session.getMqttPublishManager().sendToQueue(message, false);
        }
        catch (DisconnectException e) {
            this.sendDisconnect(e.getCode());
            this.disconnect(true);
        }
    }

    void sendPubAck(int messageId, byte reasonCode) {
        this.sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBACK, reasonCode);
    }

    void sendPubRel(int messageId) {
        this.sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREL);
    }

    void sendPubRec(int messageId, byte reasonCode) {
        this.sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREC, reasonCode);
    }

    void sendPubComp(int messageId) {
        this.sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBCOMP);
    }

    void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType) {
        this.sendPublishProtocolControlMessage(messageId, messageType, (byte)0);
    }

    void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType, byte reasonCode) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, messageType == MqttMessageType.PUBREL ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE, false, 0);
        Object variableHeader = this.session.getVersion() == MQTTVersion.MQTT_5 ? new MqttPubReplyMessageVariableHeader(messageId, reasonCode, MqttProperties.NO_PROPERTIES) : MqttMessageIdVariableHeader.from((int)messageId);
        MqttPubAckMessage pubAck = new MqttPubAckMessage(fixedHeader, variableHeader);
        this.sendToClient((MqttMessage)pubAck);
    }

    void handlePuback(MqttPubAckMessage message) throws Exception {
        this.session.getMqttPublishManager().handlePubAck(this.getMessageId((MqttMessage)message));
    }

    void handlePubrec(MqttMessage message) throws Exception {
        this.session.getMqttPublishManager().handlePubRec(this.getMessageId(message));
    }

    void handlePubrel(MqttMessage message) {
        this.session.getMqttPublishManager().handlePubRel(this.getMessageId(message));
    }

    void handlePubcomp(MqttMessage message) throws Exception {
        this.session.getMqttPublishManager().handlePubComp(this.getMessageId(message));
    }

    void handleSubscribe(MqttSubscribeMessage message) throws Exception {
        Integer subscriptionIdentifier = MQTTUtil.getProperty(Integer.class, message.idAndPropertiesVariableHeader().properties(), MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER, null);
        int[] qos = this.session.getSubscriptionManager().addSubscriptions(message.payload().topicSubscriptions(), subscriptionIdentifier);
        MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(message.variableHeader().messageId(), MqttProperties.NO_PROPERTIES);
        MqttSubAckMessage subAck = new MqttSubAckMessage(header, variableHeader, new MqttSubAckPayload(qos));
        this.sendToClient((MqttMessage)subAck);
    }

    void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
        short[] reasonCodes = this.session.getSubscriptionManager().removeSubscriptions(message.payload().topics(), true);
        MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttUnsubAckMessage unsubAck = this.session.getVersion() == MQTTVersion.MQTT_5 ? new MqttUnsubAckMessage(header, message.variableHeader(), new MqttUnsubAckPayload(reasonCodes)) : new MqttUnsubAckMessage(header, message.variableHeader());
        this.sendToClient((MqttMessage)unsubAck);
    }

    void handlePingreq() {
        MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
        this.sendToClient(pingResp);
    }

    protected void sendToClient(MqttMessage message) {
        if (this.protocolManager.invokeOutgoing(message, this.connection) != null) {
            return;
        }
        MQTTUtil.logMessage(this.session.getState(), message, false, this.session.getVersion());
        this.ctx.writeAndFlush((Object)message, this.ctx.voidPromise());
    }

    private int getMessageId(MqttMessage message) {
        return ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
    }

    ActiveMQServer getServer() {
        return this.server;
    }

    private void calculateKeepAlive(MqttConnectMessage connect) {
        int serverKeepAlive = this.session.getProtocolManager().getServerKeepAlive();
        int clientKeepAlive = connect.variableHeader().keepAliveTimeSeconds();
        if (serverKeepAlive == -1 || clientKeepAlive <= serverKeepAlive && clientKeepAlive != 0) {
            this.connectionEntry.ttl = (long)clientKeepAlive * 1500L;
        } else {
            this.session.setUsingServerKeepAlive(true);
        }
    }

    private boolean checkClientVersion() {
        if (this.session.getVersion() != MQTTVersion.MQTT_3_1 && this.session.getVersion() != MQTTVersion.MQTT_3_1_1 && this.session.getVersion() != MQTTVersion.MQTT_5) {
            if (this.session.getVersion().getVersion() <= MQTTVersion.MQTT_3_1_1.getVersion()) {
                this.sendConnack((byte)1);
            } else {
                this.sendConnack((byte)-124);
            }
            this.disconnect(true);
            return false;
        }
        return true;
    }

    private LinkStealingResult handleLinkStealing() throws Exception {
        LinkStealingResult result;
        String clientID = this.session.getConnection().getClientID();
        if (this.protocolManager.getStateManager().isClientConnected(clientID)) {
            MQTTConnection existingConnection = this.protocolManager.getStateManager().getConnectedClient(clientID);
            if (this.protocolManager.isAllowLinkStealing()) {
                MQTTSession existingSession = this.protocolManager.getStateManager().getSessionState(clientID).getSession();
                if (existingSession != null) {
                    if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
                        existingSession.getProtocolHandler().sendDisconnect((byte)-114);
                    }
                    existingSession.getConnectionManager().disconnect(false);
                } else {
                    existingConnection.disconnect(false);
                }
                logger.debug("Existing MQTT session from {} closed due to incoming session from {} with the same client ID: {}", new Object[]{existingConnection.getRemoteAddress(), this.connection.getRemoteAddress(), this.session.getConnection().getClientID()});
                result = LinkStealingResult.EXISTING_LINK_STOLEN;
            } else {
                if (this.session.getVersion() == MQTTVersion.MQTT_5) {
                    this.sendDisconnect((byte)-128);
                }
                logger.debug("Incoming MQTT session from {} closed due to existing session from {} with the same client ID: {}", new Object[]{this.connection.getRemoteAddress(), existingConnection.getRemoteAddress(), this.session.getConnection().getClientID()});
                this.session.setStopped(true);
                this.connection.disconnect(false);
                result = LinkStealingResult.NEW_LINK_DENIED;
            }
        } else {
            result = LinkStealingResult.NO_ACTION;
        }
        return result;
    }

    private Pair<Boolean, String> validateUser(String username, String password) throws Exception {
        Boolean result;
        String validatedUser = null;
        try {
            validatedUser = this.server.validateUser(username, password, (RemotingConnection)this.session.getConnection(), this.session.getProtocolManager().getSecurityDomain());
            result = Boolean.TRUE;
        }
        catch (ActiveMQSecurityException e) {
            if (this.session.getVersion() == MQTTVersion.MQTT_5) {
                this.session.getProtocolHandler().sendConnack((byte)-122);
            } else {
                this.session.getProtocolHandler().sendConnack((byte)5);
            }
            this.disconnect(true);
            result = Boolean.FALSE;
        }
        return new Pair((Object)result, (Object)validatedUser);
    }

    private boolean validateClientID(boolean isCleanSession) {
        if (this.session.getConnection().getClientID() == null || this.session.getConnection().getClientID().isEmpty()) {
            if (isCleanSession) {
                this.session.getConnection().setClientID(UUID.randomUUID().toString());
                this.session.getConnection().setClientIdAssignedByBroker(true);
            } else {
                return this.handleInvalidClientId();
            }
        }
        return true;
    }

    private boolean handleInvalidClientId() {
        if (this.session.getVersion() == MQTTVersion.MQTT_5) {
            this.session.getProtocolHandler().sendConnack((byte)-123);
        } else {
            this.session.getProtocolHandler().sendConnack((byte)2);
        }
        this.disconnect(true);
        return false;
    }

    private static enum LinkStealingResult {
        EXISTING_LINK_STOLEN,
        NEW_LINK_DENIED,
        NO_ACTION;

    }
}

