/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.shenyu.common.utils.Singleton;
import org.apache.shenyu.protocol.mqtt.MessageType;
import org.apache.shenyu.protocol.mqtt.repositories.SubscribeRepository;
import org.apache.shenyu.protocol.mqtt.repositories.TopicRepository;

public class Publish
extends MessageType {
    @Override
    public void publish(ChannelHandlerContext ctx, MqttPublishMessage msg) {
        if (this.isConnected()) {
            return;
        }
        String topic = msg.variableHeader().topicName();
        ByteBuf payload = msg.payload();
        String message = this.byteBufToString(payload);
        MqttQoS mqttQoS = msg.fixedHeader().qosLevel();
        ((TopicRepository)Singleton.INST.get(TopicRepository.class)).add(topic, message);
        int packetId = msg.variableHeader().packetId();
        CompletableFuture.runAsync(() -> this.send(topic, payload, packetId));
        switch (mqttQoS.value()) {
            case 0: {
                break;
            }
            case 1: {
                this.qos1(ctx, packetId);
                break;
            }
            case 2: {
                this.qos2(ctx, packetId);
                break;
            }
        }
    }

    private void qos0() {
    }

    private void qos1(ChannelHandlerContext ctx, int packetId) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttMessageIdVariableHeader mqttMsgIdVariableHeader = MqttMessageIdVariableHeader.from((int)packetId);
        MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
        ctx.writeAndFlush((Object)mqttPubAckMessage);
    }

    private void qos2(ChannelHandlerContext ctx, int packetId) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.EXACTLY_ONCE, false, 0);
        MqttMessageIdVariableHeader mqttMsgIdVariableHeader = MqttMessageIdVariableHeader.from((int)packetId);
        MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
        ctx.writeAndFlush((Object)mqttPubAckMessage);
    }

    private String byteBufToString(ByteBuf byteBuf) {
        if (byteBuf.hasArray()) {
            return new String(byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(), byteBuf.readableBytes());
        }
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.getBytes(byteBuf.readerIndex(), bytes);
        return new String(bytes, 0, byteBuf.readableBytes());
    }

    private void send(String topic, ByteBuf payload, int packetId) {
        List<Channel> channels = ((SubscribeRepository)Singleton.INST.get(SubscribeRepository.class)).get(topic);
        channels.parallelStream().forEach(channel -> {
            if (channel.isActive()) {
                MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0);
                MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, packetId);
                MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, Unpooled.wrappedBuffer((ByteBuf)payload));
                channel.writeAndFlush((Object)mqttPublishMessage);
            }
        });
    }
}

