/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.jdbc.channel;

import java.util.Optional;
import java.util.concurrent.Executor;
import org.jspecify.annotations.Nullable;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ReflectionUtils;

public class PostgresSubscribableChannel
extends AbstractSubscribableChannel
implements PostgresChannelMessageTableSubscriber.Subscription {
    private static final LogAccessor LOGGER = new LogAccessor(PostgresSubscribableChannel.class);
    private static final Optional<?> FALLBACK_STUB = Optional.of(new Object());
    private final JdbcChannelMessageStore jdbcChannelMessageStore;
    private final Object groupId;
    private final PostgresChannelMessageTableSubscriber messageTableSubscriber;
    private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
    private @Nullable TransactionTemplate transactionTemplate;
    private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build();
    private ErrorHandler errorHandler = ReflectionUtils::rethrowRuntimeException;
    private Executor executor;
    private volatile boolean hasHandlers;

    public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageStore, Object groupId, PostgresChannelMessageTableSubscriber messageTableSubscriber) {
        Assert.notNull((Object)jdbcChannelMessageStore, (String)"A jdbcChannelMessageStore must be provided.");
        Assert.notNull((Object)groupId, (String)"A groupId must be set.");
        Assert.notNull((Object)messageTableSubscriber, (String)"A messageTableSubscriber must be set.");
        this.jdbcChannelMessageStore = jdbcChannelMessageStore;
        this.groupId = groupId;
        this.messageTableSubscriber = messageTableSubscriber;
    }

    public void setDispatcherExecutor(Executor executor) {
        Assert.notNull((Object)executor, (String)"An executor must be provided.");
        this.executor = executor;
    }

    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        Assert.notNull((Object)transactionManager, (String)"A platform transaction manager must be provided.");
        this.transactionTemplate = new TransactionTemplate(transactionManager);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        Assert.notNull((Object)retryTemplate, (String)"A retry template must be provided.");
        this.retryTemplate = retryTemplate;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        Assert.notNull((Object)errorHandler, (String)"'errorHandler' must not be null.");
        this.errorHandler = errorHandler;
    }

    protected void onInit() {
        super.onInit();
        if (this.executor == null) {
            this.executor = new SimpleAsyncTaskExecutor(this.getBeanName() + "-dispatcher-");
        }
    }

    public boolean subscribe(MessageHandler handler) {
        boolean subscribed = super.subscribe(handler);
        if (this.dispatcher.getHandlerCount() == 1) {
            this.messageTableSubscriber.subscribe(this);
            this.hasHandlers = true;
            this.notifyUpdate();
        }
        return subscribed;
    }

    public boolean unsubscribe(MessageHandler handle) {
        boolean unsubscribed = super.unsubscribe(handle);
        if (this.dispatcher.getHandlerCount() == 0) {
            this.messageTableSubscriber.unsubscribe(this);
            this.hasHandlers = false;
        }
        return unsubscribed;
    }

    protected MessageDispatcher getDispatcher() {
        return this.dispatcher;
    }

    protected boolean doSend(Message<?> message, long timeout) {
        this.jdbcChannelMessageStore.addMessageToGroup(this.groupId, message);
        return true;
    }

    @Override
    public void notifyUpdate() {
        this.executor.execute(() -> {
            Optional<?> dispatchedMessage;
            while ((dispatchedMessage = this.pollAndDispatchMessage()).isPresent()) {
            }
        });
    }

    private Optional<?> pollAndDispatchMessage() {
        try {
            return this.doPollAndDispatchMessage();
        }
        catch (Exception ex) {
            try {
                this.errorHandler.handleError((Throwable)ex);
            }
            catch (Exception ex1) {
                LOGGER.error((Throwable)ex, (CharSequence)"Exception during message dispatch");
            }
            return FALLBACK_STUB;
        }
    }

    private Optional<?> doPollAndDispatchMessage() {
        if (this.hasHandlers) {
            TransactionTemplate transactionTemplateToUse = this.transactionTemplate;
            if (transactionTemplateToUse != null) {
                return (Optional)this.retryTemplate.execute(context -> (Optional)transactionTemplateToUse.execute(status -> this.pollMessage().filter(message -> {
                    if (!this.hasHandlers) {
                        status.setRollbackOnly();
                        return false;
                    }
                    return true;
                }).map(this::dispatch)));
            }
            return this.pollMessage().map(message -> (Message)this.retryTemplate.execute(context -> this.dispatch((Message<?>)message)));
        }
        return Optional.empty();
    }

    private Optional<Message<?>> pollMessage() {
        return Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId));
    }

    private Message<?> dispatch(Message<?> message) {
        this.dispatcher.dispatch(message);
        return message;
    }

    @Override
    public String getRegion() {
        return this.jdbcChannelMessageStore.getRegion();
    }

    @Override
    public Object getGroupId() {
        return this.groupId;
    }
}

