/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.protocol.mqtt;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.utils.Singleton;
import org.apache.shenyu.protocol.mqtt.MessageType;
import org.apache.shenyu.protocol.mqtt.repositories.SubscribeRepository;
import org.apache.shenyu.protocol.mqtt.repositories.TopicRepository;

public class Subscribe
extends MessageType {
    @Override
    public void subscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {
        Channel channel = ctx.channel();
        if (this.isConnected()) {
            channel.close().addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
            return;
        }
        List mqttTopicSubscriptions = msg.payload().topicSubscriptions();
        int packetId = msg.variableHeader().messageId();
        List<String> ackTopics = mqttTopicSubscriptions.stream().filter(topicSub -> topicSub.qualityOfService() != MqttQoS.FAILURE).map(MqttTopicSubscription::topicName).collect(Collectors.toList());
        ((SubscribeRepository)Singleton.INST.get(SubscribeRepository.class)).add(ctx.channel(), mqttTopicSubscriptions);
        for (String ackTopic : ackTopics) {
            String message = ((TopicRepository)Singleton.INST.get(TopicRepository.class)).get(ackTopic);
            if (!StringUtils.isNotEmpty((CharSequence)message)) continue;
            this.sendSubMessage(ackTopic, message, packetId, channel);
        }
        this.sendSubAckMessage(packetId, ackTopics, channel);
    }

    private void sendSubAckMessage(int packetId, List<String> ackTopics, Channel channel) {
        ArrayList<Integer> qos = new ArrayList<Integer>();
        for (int i = 0; i < ackTopics.size(); ++i) {
            qos.add(MqttQoS.AT_MOST_ONCE.value());
        }
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttSubAckPayload payload = new MqttSubAckPayload(qos);
        MqttSubAckMessage mqttSubAckMessage = new MqttSubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from((int)packetId), payload);
        channel.writeAndFlush((Object)mqttSubAckMessage);
    }

    private void sendSubMessage(String topic, String message, int packetId, Channel channel) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, true, 0);
        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, packetId);
        MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(fixedHeader, varHeader, Unpooled.copiedBuffer((CharSequence)message, (Charset)CharsetUtil.UTF_8));
        channel.writeAndFlush((Object)mqttPublishMessage);
    }
}

