/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.redis;

import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.redis.RedisAbstractConfig;
import org.apache.pulsar.io.redis.sink.RedisSinkConfig;

public class RedisSession {
    private final AbstractRedisClient client;
    private final StatefulConnection connection;
    private final RedisClusterAsyncCommands<byte[], byte[]> asyncCommands;

    public RedisSession(AbstractRedisClient client, StatefulConnection connection, RedisClusterAsyncCommands<byte[], byte[]> asyncCommands) {
        this.client = client;
        this.connection = connection;
        this.asyncCommands = asyncCommands;
    }

    public AbstractRedisClient client() {
        return this.client;
    }

    public StatefulConnection connection() {
        return this.connection;
    }

    public RedisClusterAsyncCommands<byte[], byte[]> asyncCommands() {
        return this.asyncCommands;
    }

    public void close() throws Exception {
        if (null != this.connection) {
            this.connection.close();
        }
        if (null != this.client) {
            this.client.shutdown();
        }
    }

    public static RedisSession create(RedisSinkConfig config) {
        RedisSession redisSession;
        RedisAbstractConfig.ClientMode clientMode;
        ByteArrayCodec codec = new ByteArrayCodec();
        SocketOptions socketOptions = SocketOptions.builder().tcpNoDelay(config.isTcpNoDelay()).connectTimeout(Duration.ofMillis(config.getConnectTimeout())).keepAlive(config.isKeepAlive()).build();
        try {
            clientMode = RedisAbstractConfig.ClientMode.valueOf(config.getClientMode().toUpperCase());
        }
        catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("Illegal Redis client mode, valid values are: " + String.valueOf(Arrays.asList(RedisAbstractConfig.ClientMode.values())));
        }
        List<RedisURI> redisURIs = RedisSession.redisURIs(config.getHostAndPorts(), config);
        if (clientMode == RedisAbstractConfig.ClientMode.STANDALONE) {
            ClientOptions.Builder clientOptions = ClientOptions.builder().socketOptions(socketOptions).requestQueueSize(config.getRequestQueue()).autoReconnect(config.isAutoReconnect());
            RedisClient client = RedisClient.create((RedisURI)redisURIs.get(0));
            client.setOptions(clientOptions.build());
            StatefulRedisConnection connection = client.connect((RedisCodec)codec);
            redisSession = new RedisSession((AbstractRedisClient)client, (StatefulConnection)connection, (RedisClusterAsyncCommands<byte[], byte[]>)connection.async());
        } else if (clientMode == RedisAbstractConfig.ClientMode.CLUSTER) {
            ClusterClientOptions.Builder clientOptions = ClusterClientOptions.builder().requestQueueSize(config.getRequestQueue()).autoReconnect(config.isAutoReconnect());
            RedisClusterClient client = RedisClusterClient.create(redisURIs);
            client.setOptions(clientOptions.build());
            StatefulRedisClusterConnection connection = client.connect((RedisCodec)codec);
            redisSession = new RedisSession((AbstractRedisClient)client, (StatefulConnection)connection, (RedisClusterAsyncCommands<byte[], byte[]>)connection.async());
        } else {
            throw new UnsupportedOperationException(String.format("%s is not supported", config.getClientMode()));
        }
        return redisSession;
    }

    private static List<RedisURI> redisURIs(List<HostAndPort> hostAndPorts, RedisSinkConfig config) {
        ArrayList redisURIs = Lists.newArrayList();
        for (HostAndPort hostAndPort : hostAndPorts) {
            RedisURI.Builder builder = RedisURI.builder();
            builder.withHost(hostAndPort.getHost());
            builder.withPort(hostAndPort.getPort());
            builder.withDatabase(config.getRedisDatabase());
            if (!StringUtils.isBlank((CharSequence)config.getRedisPassword())) {
                builder.withPassword(config.getRedisPassword());
            }
            redisURIs.add(builder.build());
        }
        return redisURIs;
    }
}

