/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.mqtt.server;

import com.alibaba.mqtt.server.callback.SendCallback;
import com.alibaba.mqtt.server.common.SendResult;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ProducerConfig;
import com.alibaba.mqtt.server.network.AbstractChannel;
import com.alibaba.mqtt.server.util.ThreadFactoryImpl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ServerProducer
extends AbstractChannel {
    private Map<Long, SendCallbackWrapper> sendCallbackMap = new ConcurrentHashMap<Long, SendCallbackWrapper>(32);
    private ProducerConfig producerConfig;
    private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("scan_server_producer_callback_"));

    public ServerProducer(ChannelConfig channelConfig, ProducerConfig producerConfig) {
        super(channelConfig);
        this.producerConfig = producerConfig;
    }

    @Override
    public void start() throws IOException, TimeoutException {
        super.start();
        this.channel.confirmSelect();
        this.channel.addConfirmListener(new ConfirmListener(){

            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                SendCallbackWrapper sendCallbackWrapper = (SendCallbackWrapper)ServerProducer.this.sendCallbackMap.remove(deliveryTag);
                if (sendCallbackWrapper != null && sendCallbackWrapper.sendCallback != null) {
                    sendCallbackWrapper.sendCallback.onSuccess(sendCallbackWrapper.msgId);
                }
            }

            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                SendCallbackWrapper sendCallbackWrapper = (SendCallbackWrapper)ServerProducer.this.sendCallbackMap.remove(deliveryTag);
                if (sendCallbackWrapper != null && sendCallbackWrapper.sendCallback != null) {
                    sendCallbackWrapper.sendCallback.onFail();
                }
            }
        });
        this.scheduler.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                for (Map.Entry entry : ServerProducer.this.sendCallbackMap.entrySet()) {
                    SendCallbackWrapper sendCallbackWrapper = (SendCallbackWrapper)entry.getValue();
                    if (System.currentTimeMillis() - sendCallbackWrapper.timestamp <= ServerProducer.this.producerConfig.getSendTimeoutMills() + 1000L) continue;
                    sendCallbackWrapper.sendCallback.onFail();
                    ServerProducer.this.sendCallbackMap.remove(entry.getKey());
                }
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void stop() throws IOException {
        super.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendMessage(String mqttTopic, byte[] payload, SendCallback sendCallback) throws IOException {
        String msgId = UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
        Channel channel = this.getChannel();
        synchronized (channel) {
            long publishSeqNo = this.channel.getNextPublishSeqNo();
            HashMap<String, Long> headers = new HashMap<String, Long>();
            headers.put("seqId", publishSeqNo);
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).messageId(msgId).build();
            if (sendCallback != null) {
                this.sendCallbackMap.put(publishSeqNo, new SendCallbackWrapper(sendCallback, msgId));
            }
            this.channel.basicPublish(mqttTopic, mqttTopic, true, props, payload);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SendResult sendMessage(String mqttTopic, byte[] payload) throws IOException {
        String msgId = UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        SendResult sendResult = new SendResult(false);
        SyncSendCallBack syncSendCallBack = new SyncSendCallBack(sendResult, countDownLatch);
        Channel channel = this.getChannel();
        synchronized (channel) {
            long publishSeqNo = this.channel.getNextPublishSeqNo();
            HashMap<String, Long> headers = new HashMap<String, Long>();
            headers.put("seqId", publishSeqNo);
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).messageId(msgId).build();
            this.sendCallbackMap.put(publishSeqNo, new SendCallbackWrapper(syncSendCallBack, msgId));
            this.channel.basicPublish(mqttTopic, mqttTopic, true, props, payload);
        }
        try {
            countDownLatch.await(this.producerConfig.getSendTimeoutMills() + 1000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            return sendResult;
        }
        return sendResult;
    }

    private class SyncSendCallBack
    implements SendCallback {
        private SendResult sendResult;
        private CountDownLatch countDownLatch;

        public SyncSendCallBack(SendResult sendResult, CountDownLatch countDownLatch) {
            this.sendResult = sendResult;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void onSuccess(String msgId) {
            this.sendResult.setMsgId(msgId);
            this.sendResult.setSuccess(true);
            this.countDownLatch.countDown();
        }

        @Override
        public void onFail() {
            this.countDownLatch.countDown();
        }
    }

    private class SendCallbackWrapper {
        private SendCallback sendCallback;
        private String msgId;
        private long timestamp = System.currentTimeMillis();

        public SendCallbackWrapper(SendCallback sendCallback, String msgId) {
            this.sendCallback = sendCallback;
            this.msgId = msgId;
        }
    }
}

