/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver;

import com.sap.cloud.servicesdk.xbem.adapter.amqp10.AmqpMessagingEndpoint;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.MessagingAmqpConnectionException;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageConfig;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageSubscription;
import com.sap.cloud.servicesdk.xbem.api.Message;
import com.sap.cloud.servicesdk.xbem.api.MessagingException;
import com.sap.cloud.servicesdk.xbem.api.MessagingRuntimeException;
import com.sap.cloud.servicesdk.xbem.api.MessagingService;
import java.io.Serializable;
import java.util.Spliterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpMessagingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpMessagingConsumer.class);
    public static final int DEFAULT_RECONNECT_ATTEMPTS = 5;
    public static final int DEFAULT_RECONNECT_ATTEMPTS_INTERVAL = 1000;
    private final Object subscriptionLock = new Object();
    private final AmqpMessagingEndpoint endpoint;
    private final MessagingService.Config config;
    private MessageSubscription subscription;
    private String reconnectId = null;

    public AmqpMessagingConsumer(MessagingService.Config config, AmqpMessagingEndpoint messagingEndpoint) {
        this.endpoint = messagingEndpoint;
        this.config = config;
    }

    AmqpMessagingConsumer(MessagingService.Config config, AmqpMessagingEndpoint messagingEndpoint, MessageSubscription subscription) {
        this.endpoint = messagingEndpoint;
        this.config = config;
        this.subscription = subscription;
    }

    public boolean isActive() {
        if (this.subscription == null) {
            return this.isReconnecting();
        }
        return this.isReconnecting() || this.subscription.isActive();
    }

    Message<?> receive() throws MessagingException {
        if (this.subscription == null) {
            this.initalizeSubscription();
        }
        try {
            return this.subscription.getOrWait();
        }
        catch (MessagingException e) {
            return this.handleReceiverError(e);
        }
    }

    private Message<?> handleReceiverError(MessagingException error) {
        if (error instanceof MessagingAmqpConnectionException && ((Boolean)this.config.getSetting(MessagingService.Setting.RECONNECT_AFTER_FAILURE, (Object)true)).booleanValue()) {
            return this.handleReconnect((MessagingAmqpConnectionException)error);
        }
        throw new MessagingRuntimeException((Throwable)error);
    }

    private Message handleReconnect(MessagingAmqpConnectionException exception) {
        String tmpReconnectId;
        int attempts = (Integer)this.config.getSetting(MessagingService.Setting.RECONNECT_ATTEMPTS, (Object)5);
        int intervalMs = (Integer)this.config.getSetting(MessagingService.Setting.RECONNECT_ATTEMPTS_INTERVAL_MS, (Object)1000);
        this.reconnectId = tmpReconnectId = UUID.randomUUID().toString();
        while (attempts > 0 && tmpReconnectId.equals(this.reconnectId)) {
            try {
                this.subscription = null;
                this.initalizeSubscription();
                this.reconnectId = null;
                return this.receive();
            }
            catch (MessagingException | MessagingRuntimeException e) {
                if (!tmpReconnectId.equals(this.reconnectId)) continue;
                this.silentWait(intervalMs);
                LOG.error("Failed re-connect attempt with exception message: " + e.getMessage(), e);
                LOG.warn("Failed re-connect attempt (start retry after {}ms with {} remaining attempts).", (Object)intervalMs, (Object)(--attempts));
            }
        }
        this.reconnectId = null;
        throw new MessagingRuntimeException("Failed to reconnect after '" + attempts + "' attempts. Initial connection error messages: " + exception.getMessage(), (Throwable)((Object)exception));
    }

    private boolean isReconnecting() {
        return this.reconnectId != null;
    }

    private void silentWait(int waitInMs) {
        try {
            TimeUnit.MILLISECONDS.sleep(waitInMs);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(boolean force, long time, TimeUnit unit) throws MessagingException {
        Object object = this.subscriptionLock;
        synchronized (object) {
            if (this.subscription == null) {
                LOG.warn("Called close on not created or already closed subscription.");
            } else {
                this.subscription.close(time, unit);
                this.subscription = null;
            }
        }
    }

    private synchronized void initalizeSubscription() throws MessagingException {
        if (this.subscription == null) {
            MessageConfig cfg = MessageConfig.forReceiver(this.config, this.endpoint).create();
            this.subscription = MessageSubscription.with(this.endpoint.getClientId(), cfg);
            this.subscription.connect();
        }
    }

    public <T extends Serializable> Stream<Message<T>> provideStream(Class<T> clazz) {
        return StreamSupport.stream(new MessageSpliterator<T>(this, clazz), false);
    }

    public boolean isConnected() {
        if (this.subscription == null) {
            return this.isReconnecting();
        }
        return this.isReconnecting() || this.subscription.isConnected();
    }

    public boolean isClosed() {
        if (this.subscription == null) {
            return !this.isReconnecting();
        }
        return this.subscription.isClosed();
    }

    private static class MessageSpliterator<T extends Serializable>
    implements Spliterator<Message<? extends T>> {
        private AmqpMessagingConsumer consumer;

        MessageSpliterator(AmqpMessagingConsumer consumer, Class<T> cls) {
            this.consumer = consumer;
        }

        @Override
        public boolean tryAdvance(Consumer<? super Message<? extends T>> action) {
            try {
                Message<?> msg = this.consumer.receive();
                if (msg == null) {
                    if (this.consumer.isClosed()) {
                        return false;
                    }
                    return this.consumer.config.isDaemonMode();
                }
                action.accept(msg);
                return true;
            }
            catch (MessagingException e) {
                LOG.error("Failure on message receive stream (tryAdvance)");
                LOG.debug("Failure on message receive stream (tryAdvance) with exception: {}", (Object)e.toString());
                throw new MessagingRuntimeException("Failure on message receive stream (tryAdvance)", (Throwable)e);
            }
        }

        @Override
        public Spliterator<Message<? extends T>> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return 0L;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }
}

