/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.mqtt.server;

import com.alibaba.mqtt.server.callback.MessageListener;
import com.alibaba.mqtt.server.callback.StatusListener;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ConsumerConfig;
import com.alibaba.mqtt.server.model.MessageProperties;
import com.alibaba.mqtt.server.model.StatusNotice;
import com.alibaba.mqtt.server.network.AbstractChannel;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ServerConsumer
extends AbstractChannel {
    private ConsumerConfig consumerConfig;
    private ExecutorService msgExecutor;
    private ExecutorService statusExecutor;

    public ServerConsumer(ChannelConfig channelConfig, ConsumerConfig consumerConfig) {
        super(channelConfig);
        this.consumerConfig = consumerConfig;
    }

    @Override
    public void start() throws IOException, TimeoutException {
        super.start();
        this.msgExecutor = new ThreadPoolExecutor(this.consumerConfig.getMinConsumeThreadNum(), this.consumerConfig.getMaxConsumeThreadNum(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
        this.statusExecutor = new ThreadPoolExecutor(this.consumerConfig.getMinConsumeThreadNum(), this.consumerConfig.getMaxConsumeThreadNum(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public void stop() throws IOException {
        super.stop();
    }

    public void subscribeTopic(String firstTopic, final MessageListener messageListener) throws IOException {
        this.channel.basicConsume(firstTopic, false, (Consumer)new DefaultConsumer(this.channel){

            public void handleDelivery(String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) {
                ServerConsumer.this.msgExecutor.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            messageListener.process(properties.getMessageId(), new MessageProperties(properties), body);
                            ServerConsumer.this.channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                        catch (Throwable t) {
                            try {
                                ServerConsumer.this.channel.basicNack(envelope.getDeliveryTag(), false, false);
                            }
                            catch (IOException iOException) {
                                // empty catch block
                            }
                        }
                    }
                });
            }
        });
    }

    public void subscribeStatus(String mqttGroupId, final StatusListener statusListener) throws IOException {
        HashMap<String, String> arguments = new HashMap<String, String>(4);
        arguments.put("GROUP_ID", mqttGroupId);
        this.channel.basicConsume("STATUS", false, arguments, (Consumer)new DefaultConsumer(this.channel){

            public void handleDelivery(String consumerTag, final Envelope envelope, AMQP.BasicProperties properties, final byte[] body) throws IOException {
                ServerConsumer.this.statusExecutor.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            statusListener.process(new StatusNotice(body));
                            ServerConsumer.this.channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                        catch (Throwable t) {
                            try {
                                ServerConsumer.this.channel.basicNack(envelope.getDeliveryTag(), false, false);
                            }
                            catch (IOException iOException) {
                                // empty catch block
                            }
                        }
                    }
                });
            }
        });
    }
}

