/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.client;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;
import java.io.IOException;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueingConsumer
extends DefaultConsumer {
    private static final Logger log = LoggerFactory.getLogger(QueueingConsumer.class);
    private final Handover<Delivery> handover;
    private volatile ShutdownSignalException shutdown;
    private volatile ConsumerCancelledException cancelled;
    private static final Delivery POISON = new Delivery(null, null, null);

    public QueueingConsumer(Channel channel, Handover<Delivery> handover) {
        this(channel, Integer.MAX_VALUE, handover);
    }

    public QueueingConsumer(Channel channel, int capacity, Handover<Delivery> handover) {
        super(channel);
        this.handover = handover;
    }

    private void checkShutdown() {
        if (this.shutdown != null) {
            throw Utility.fixStackTrace(this.shutdown);
        }
    }

    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        this.shutdown = sig;
        try {
            this.handover.produce((Object)POISON);
        }
        catch (InterruptedException | Handover.ClosedException e) {
            throw new RabbitmqConnectorException((SeaTunnelErrorCode)RabbitmqConnectorErrorCode.HANDLE_SHUTDOWN_SIGNAL_FAILED, e);
        }
    }

    @Override
    public void handleCancel(String consumerTag) throws IOException {
        this.cancelled = new ConsumerCancelledException();
        this.handover.produce((Object)POISON);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        this.checkShutdown();
        this.handover.produce((Object)new Delivery(envelope, properties, body));
    }
}

