/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.storage.redis;

import io.debezium.DebeziumException;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.ClusterPipeline;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.util.JedisClusterCRC16;

public class JedisClusterClient
implements RedisClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterClient.class);
    private static final int MAX_ATTEMPTS = 10;
    private static final long DELAY_MILLIS = 200L;
    private final JedisCluster jedisCluster;

    public JedisClusterClient(JedisCluster jedisCluster) {
        this.jedisCluster = jedisCluster;
        this.ensureClusterInitialized();
    }

    @Override
    public void disconnect() {
        this.tryErrors(() -> ((JedisCluster)this.jedisCluster).close());
    }

    @Override
    public void close() {
        this.tryErrors(() -> ((JedisCluster)this.jedisCluster).close());
    }

    @Override
    public String xadd(String key, Map<String, String> hash) {
        return this.tryErrors(() -> this.jedisCluster.xadd(key, (StreamEntryID)null, hash).toString());
    }

    @Override
    public List<String> xadd(List<AbstractMap.SimpleEntry<String, Map<String, String>>> hashes) {
        return this.tryErrors(() -> {
            try {
                this.validateKeysForPipelining(hashes);
                this.jedisCluster.ping();
                ClusterPipeline pipeline = this.jedisCluster.pipelined();
                ArrayList responses = new ArrayList(hashes.size());
                hashes.forEach(hash -> responses.add(pipeline.xadd((String)hash.getKey(), StreamEntryID.NEW_ENTRY, (Map)hash.getValue())));
                pipeline.sync();
                return responses.stream().map(r -> ((StreamEntryID)r.get()).toString()).toList();
            }
            catch (JedisDataException jde) {
                if (jde.getMessage().equals("LOADING Redis is loading the dataset in memory")) {
                    LOGGER.error("Redis cluster is starting", (Throwable)jde);
                }
                LOGGER.error("Unexpected JedisDataException", (Throwable)jde);
                throw new DebeziumException((Throwable)jde);
            }
            catch (JedisClusterOperationException crossSlotException) {
                LOGGER.warn("Cross-slot operation detected, falling back to sequential processing", (Throwable)crossSlotException);
                return this.fallbackSequentialXadd(hashes);
            }
            return Collections.emptyList();
        });
    }

    private void validateKeysForPipelining(List<AbstractMap.SimpleEntry<String, Map<String, String>>> hashes) {
        if (hashes.isEmpty()) {
            return;
        }
        String firstKey = hashes.get(0).getKey();
        int expectedSlot = JedisClusterCRC16.getSlot((String)firstKey);
        for (AbstractMap.SimpleEntry<String, Map<String, String>> hash : hashes) {
            int slot = JedisClusterCRC16.getSlot((String)hash.getKey());
            if (slot == expectedSlot) continue;
            throw new JedisClusterOperationException("Keys must belong to the same slot for pipelining");
        }
    }

    private List<String> fallbackSequentialXadd(List<AbstractMap.SimpleEntry<String, Map<String, String>>> hashes) {
        ArrayList<String> results = new ArrayList<String>();
        for (AbstractMap.SimpleEntry<String, Map<String, String>> hash : hashes) {
            try {
                String id = this.jedisCluster.xadd(hash.getKey(), StreamEntryID.NEW_ENTRY, hash.getValue()).toString();
                results.add(id);
            }
            catch (Exception e) {
                LOGGER.error("Failed to add entry for key: " + hash.getKey(), (Throwable)e);
                results.add(null);
            }
        }
        return results;
    }

    @Override
    public List<Map<String, String>> xrange(String key) {
        return this.tryErrors(() -> this.jedisCluster.xrange(key, (StreamEntryID)null, (StreamEntryID)null).stream().map(StreamEntry::getFields).toList());
    }

    @Override
    public long xlen(String key) {
        return this.tryErrors(() -> this.jedisCluster.xlen(key));
    }

    @Override
    public Map<String, String> hgetAll(String key) {
        return this.tryErrors(() -> this.jedisCluster.hgetAll(key));
    }

    @Override
    public long hset(byte[] key, byte[] field, byte[] value) {
        return this.tryErrors(() -> this.jedisCluster.hset(key, field, value));
    }

    @Override
    public long waitReplicas(int replicas, long timeout) {
        throw new UnsupportedOperationException("waitReplicas is not directly supported in cluster mode. Each shard manages replication independently.");
    }

    @Override
    public String info(String section) {
        return this.tryErrors(() -> this.jedisCluster.info(section));
    }

    @Override
    public String clientList() {
        throw new UnsupportedOperationException("clientList is not directly supported in cluster mode.");
    }

    public String toString() {
        return "JedisClusterClient [jedisCluster=" + String.valueOf(this.jedisCluster) + "]";
    }

    private void tryErrors(Runnable runnable) {
        this.tryErrors(() -> {
            runnable.run();
            return null;
        });
    }

    private void ensureClusterInitialized() {
        int tries = 0;
        while (tries < 10) {
            try {
                this.jedisCluster.ping();
                return;
            }
            catch (JedisClusterOperationException | JedisConnectionException e) {
                if (++tries >= 10) {
                    throw new RedisClientConnectionException(e);
                }
                try {
                    Thread.sleep(200L);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RedisClientConnectionException(e);
                }
            }
        }
    }

    private <R> R tryErrors(Supplier<R> supplier) {
        try {
            return supplier.get();
        }
        catch (JedisClusterOperationException | JedisConnectionException e) {
            throw new RedisClientConnectionException(e);
        }
    }
}

