/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extension.redis.internal.service;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.mule.extension.redis.internal.error.exceptions.RedisConnectionException;
import org.mule.extension.redis.internal.error.exceptions.UnableToUnsubscribeException;
import org.mule.extension.redis.internal.service.ChannelSubscription;
import org.mule.extension.redis.internal.service.MessagingAPIService;
import org.mule.extension.redis.internal.service.RedisClientAdapter;
import org.mule.extension.redis.internal.service.dto.MessageDTO;
import org.mule.extension.redis.internal.utils.RedisUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.util.SafeEncoder;

public class RedisMessagingAPIService
implements MessagingAPIService<MessageDTO> {
    private static final Logger logger = LoggerFactory.getLogger(RedisMessagingAPIService.class);
    private RedisClientAdapter client;

    public RedisMessagingAPIService(RedisClientAdapter client) {
        Objects.requireNonNull(client, "Invalid client. Client cannot be null.");
        this.client = client;
    }

    @Override
    public ChannelSubscription subscribeToChannel(Consumer<MessageDTO> consumer, Consumer<RuntimeException> errorHandler, List<String> channels) {
        this.checkPreconditionsForChannelList(channels);
        ExecutorService executorExecutingListener = Executors.newSingleThreadExecutor();
        ChannelMessageListener channelListener = new ChannelMessageListener(consumer);
        executorExecutingListener.submit(() -> this.executeWithExceptionHandling(this.client, actualClient -> actualClient.psubscribe(channelListener, RedisUtils.getPatternsFromChannels(channels)), errorHandler));
        return () -> {
            try {
                if (channelListener.isSubscribed()) {
                    logger.debug("Unsubscribing channel listener.");
                    channelListener.punsubscribe();
                    logger.debug("Successfully unsubscribed from channel.");
                }
            }
            catch (JedisConnectionException e) {
                errorHandler.accept(new UnableToUnsubscribeException("Unknown error while trying to unsubscribe.", e));
            }
            try {
                executorExecutingListener.shutdown();
                executorExecutingListener.awaitTermination(3L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                logger.error("Unable to gracefully shutdown executor service.", (Throwable)e);
            }
        };
    }

    private void checkPreconditionsForChannelList(List<String> channels) {
        Objects.requireNonNull(channels, "The list of channels cannot be null.");
        if (channels.isEmpty()) {
            throw new IllegalArgumentException("The list of channels cannot be empty.");
        }
    }

    private void executeWithExceptionHandling(RedisClientAdapter client, Consumer<RedisClientAdapter> methodToExecute, Consumer<RuntimeException> errorHandler) {
        try {
            methodToExecute.accept(client);
        }
        catch (JedisConnectionException e) {
            errorHandler.accept(new RedisConnectionException("Unable to establish connection with server.", e));
        }
    }

    public static class ChannelMessageListener
    extends BinaryJedisPubSub {
        private Consumer<MessageDTO> consumer;

        public ChannelMessageListener(Consumer<MessageDTO> consumer) {
            this.consumer = consumer;
        }

        public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
            this.consumer.accept(new MessageDTO(SafeEncoder.encode((byte[])channel), SafeEncoder.encode((byte[])message)));
        }
    }
}

