/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.redis.inbound;

import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.redis.event.RedisExceptionEvent;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.jmx.export.annotation.ManagedMetric;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;

@ManagedResource
public class RedisQueueInboundGateway
extends MessagingGatewaySupport
implements ApplicationEventPublisherAware {
    private static final String QUEUE_NAME_SUFFIX = ".reply";
    private static final RedisSerializer<String> stringSerializer = new StringRedisSerializer();
    public static final long DEFAULT_RECEIVE_TIMEOUT = 5000L;
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
    private final RedisTemplate<String, byte[]> template;
    private final BoundListOperations<String, byte[]> boundListOperations;
    private volatile ApplicationEventPublisher applicationEventPublisher;
    private volatile boolean serializerExplicitlySet;
    private volatile Executor taskExecutor;
    private volatile RedisSerializer<?> serializer = new StringRedisSerializer();
    private volatile long receiveTimeout = 5000L;
    private volatile long recoveryInterval = 5000L;
    private volatile long stopTimeout = 5000L;
    private volatile boolean active;
    private volatile boolean listening;
    private volatile boolean extractPayload = true;

    public RedisQueueInboundGateway(String queueName, RedisConnectionFactory connectionFactory) {
        Assert.hasText((String)queueName, (String)"'queueName' is required");
        Assert.notNull((Object)connectionFactory, (String)"'connectionFactory' must not be null");
        this.template = new RedisTemplate();
        this.template.setConnectionFactory(connectionFactory);
        this.template.setEnableDefaultSerializer(false);
        this.template.setKeySerializer((RedisSerializer)new StringRedisSerializer());
        this.template.afterPropertiesSet();
        this.boundListOperations = this.template.boundListOps((Object)queueName);
    }

    public void setExtractPayload(boolean extractPayload) {
        this.extractPayload = extractPayload;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void setSerializer(RedisSerializer<?> serializer) {
        this.serializer = serializer;
        this.serializerExplicitlySet = true;
    }

    public void setReceiveTimeout(long receiveTimeout) {
        Assert.isTrue((receiveTimeout > 0L ? 1 : 0) != 0, (String)"'receiveTimeout' must be > 0.");
        this.receiveTimeout = receiveTimeout;
    }

    public void setStopTimeout(long stopTimeout) {
        this.stopTimeout = stopTimeout;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void setRecoveryInterval(long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    protected void onInit() throws Exception {
        super.onInit();
        if (!this.extractPayload) {
            Assert.notNull(this.serializer, (String)"'serializer' has to be provided where 'extractPayload == false'.");
        }
        if (this.taskExecutor == null) {
            String beanName = this.getComponentName();
            this.taskExecutor = new SimpleAsyncTaskExecutor((beanName == null ? "" : beanName + "-") + this.getComponentType());
        }
        if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && this.getBeanFactory() != null) {
            MessagePublishingErrorHandler errorHandler = new MessagePublishingErrorHandler((DestinationResolver)new BeanFactoryChannelResolver(this.getBeanFactory()));
            errorHandler.setDefaultErrorChannel(this.getErrorChannel());
            this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, (ErrorHandler)errorHandler);
        }
    }

    public String getComponentType() {
        return "redis:queue-inbound-gateway";
    }

    private void handlePopException(Exception e) {
        this.listening = false;
        if (this.active) {
            this.logger.error((Object)("Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval + " milliseconds."), (Throwable)e);
            this.publishException(e);
            this.sleepBeforeRecoveryAttempt();
        } else {
            this.logger.debug((Object)("Failed to execute listening task. " + e.getClass() + ": " + e.getMessage()));
        }
    }

    private void receiveAndReply() {
        byte[] value = null;
        try {
            value = (byte[])this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            this.handlePopException(e);
            return;
        }
        String uuid = null;
        if (value != null) {
            uuid = (String)stringSerializer.deserialize(value);
            try {
                value = (byte[])this.template.boundListOps((Object)uuid).rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                this.handlePopException(e);
                return;
            }
            Message requestMessage = null;
            if (value != null) {
                if (this.extractPayload) {
                    Object payload = value;
                    if (this.serializer != null) {
                        payload = this.serializer.deserialize(value);
                    }
                    requestMessage = this.getMessageBuilderFactory().withPayload(payload).build();
                } else {
                    try {
                        requestMessage = (Message)this.serializer.deserialize(value);
                    }
                    catch (Exception e) {
                        throw new MessagingException("Deserialization of Message failed.", (Throwable)e);
                    }
                }
                Message replyMessage = this.sendAndReceiveMessage(requestMessage);
                if (replyMessage != null) {
                    if (this.extractPayload) {
                        value = !(replyMessage.getPayload() instanceof byte[]) ? (replyMessage.getPayload() instanceof String && !this.serializerExplicitlySet ? stringSerializer.serialize((Object)((String)replyMessage.getPayload())) : this.serializer.serialize(replyMessage.getPayload())) : (byte[])replyMessage.getPayload();
                    } else if (this.serializer != null) {
                        value = this.serializer.serialize((Object)replyMessage);
                    }
                    this.template.boundListOps((Object)(uuid + QUEUE_NAME_SUFFIX)).leftPush((Object)value);
                }
            }
        }
    }

    protected void doStart() {
        if (!this.active) {
            this.active = true;
            this.restart();
        }
    }

    private void sleepBeforeRecoveryAttempt() {
        if (this.recoveryInterval > 0L) {
            try {
                Thread.sleep(this.recoveryInterval);
            }
            catch (InterruptedException e) {
                this.logger.debug((Object)"Thread interrupted while sleeping the recovery interval");
                Thread.currentThread().interrupt();
            }
        }
    }

    private void publishException(Exception e) {
        if (this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent((ApplicationEvent)new RedisExceptionEvent((Object)this, e));
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("No application event publisher for exception: " + e.getMessage()));
        }
    }

    private void restart() {
        this.taskExecutor.execute(new ListenerTask());
    }

    protected void doStop() {
        try {
            this.active = false;
            this.lifecycleCondition.await(Math.min(this.stopTimeout, this.receiveTimeout), TimeUnit.MICROSECONDS);
        }
        catch (InterruptedException e) {
            this.logger.debug((Object)"Thread interrupted while stopping the endpoint");
            Thread.currentThread().interrupt();
        }
        finally {
            this.listening = false;
        }
    }

    public boolean isListening() {
        return this.listening;
    }

    @ManagedMetric
    public long getQueueSize() {
        return this.boundListOperations.size();
    }

    @ManagedOperation
    public void clearQueue() {
        this.boundListOperations.getOperations().delete(this.boundListOperations.getKey());
    }

    private class ListenerTask
    implements Runnable {
        private ListenerTask() {
        }

        @Override
        public void run() {
            try {
                while (RedisQueueInboundGateway.this.active) {
                    RedisQueueInboundGateway.this.listening = true;
                    RedisQueueInboundGateway.this.receiveAndReply();
                }
            }
            finally {
                if (RedisQueueInboundGateway.this.active) {
                    RedisQueueInboundGateway.this.restart();
                } else {
                    RedisQueueInboundGateway.this.lifecycleLock.lock();
                    try {
                        RedisQueueInboundGateway.this.lifecycleCondition.signalAll();
                    }
                    finally {
                        RedisQueueInboundGateway.this.lifecycleLock.unlock();
                    }
                }
            }
        }
    }
}

