/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.command;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.redisson.RedissonReference;
import org.redisson.SlotCallback;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.api.options.ObjectParams;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.command.RedisExecutor;
import org.redisson.config.DefaultCommandMapper;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.ServiceManager;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CompletableFutureWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandAsyncService
implements CommandAsyncExecutor {
    static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
    final Codec codec;
    final ConnectionManager connectionManager;
    final RedissonObjectBuilder objectBuilder;
    final RedissonObjectBuilder.ReferenceType referenceType;
    private final int retryAttempts;
    private final int retryInterval;
    private final int responseTimeout;
    private final boolean trackChanges;
    private static final AtomicBoolean EVAL_SHA_RO_SUPPORTED = new AtomicBoolean(true);
    private List<String> waitSupportedCommands;
    private static final Pattern COMMANDS_PATTERN = Pattern.compile("redis\\.call\\(['\"]{1}([\\w.]+)['\"]{1}");
    private static final AtomicBoolean SORT_RO_SUPPORTED = new AtomicBoolean(true);

    @Override
    public CommandAsyncExecutor copy(boolean trackChanges) {
        return new CommandAsyncService((CommandAsyncExecutor)this, trackChanges);
    }

    protected CommandAsyncService(CommandAsyncExecutor executor, boolean trackChanges) {
        CommandAsyncService service = (CommandAsyncService)executor;
        this.codec = service.codec;
        this.connectionManager = service.connectionManager;
        this.objectBuilder = service.objectBuilder;
        this.referenceType = service.referenceType;
        this.retryAttempts = service.retryAttempts;
        this.retryInterval = service.retryInterval;
        this.responseTimeout = service.responseTimeout;
        this.trackChanges = trackChanges;
    }

    @Override
    public CommandAsyncExecutor copy(ObjectParams objectParams) {
        return new CommandAsyncService((CommandAsyncExecutor)this, objectParams);
    }

    protected CommandAsyncService(CommandAsyncExecutor executor, ObjectParams objectParams) {
        CommandAsyncService service = (CommandAsyncService)executor;
        this.codec = service.codec;
        this.connectionManager = service.connectionManager;
        this.objectBuilder = service.objectBuilder;
        this.referenceType = service.referenceType;
        this.retryAttempts = objectParams.getRetryAttempts() >= 0 ? objectParams.getRetryAttempts() : this.connectionManager.getServiceManager().getConfig().getRetryAttempts();
        this.retryInterval = objectParams.getRetryInterval() > 0 ? objectParams.getRetryInterval() : this.connectionManager.getServiceManager().getConfig().getRetryInterval();
        this.responseTimeout = objectParams.getTimeout() > 0 ? objectParams.getTimeout() : this.connectionManager.getServiceManager().getConfig().getTimeout();
        this.trackChanges = false;
    }

    protected CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
        this.connectionManager = connectionManager;
        this.objectBuilder = objectBuilder;
        this.referenceType = referenceType;
        this.codec = connectionManager.getServiceManager().getCfg().getCodec();
        this.retryAttempts = connectionManager.getServiceManager().getConfig().getRetryAttempts();
        this.retryInterval = connectionManager.getServiceManager().getConfig().getRetryInterval();
        this.responseTimeout = connectionManager.getServiceManager().getConfig().getTimeout();
        this.trackChanges = false;
    }

    @Override
    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    private boolean isRedissonReferenceSupportEnabled() {
        return this.objectBuilder != null;
    }

    @Override
    public <V> V getNow(CompletableFuture<V> future) {
        try {
            return future.getNow(null);
        }
        catch (Exception e) {
            return null;
        }
    }

    @Override
    public <V> void transfer(CompletionStage<V> future1, CompletableFuture<V> future2) {
        future1.whenComplete((res, e) -> {
            if (e != null) {
                future2.completeExceptionally((Throwable)e);
                return;
            }
            future2.complete(res);
        });
    }

    @Override
    public <V> V get(RFuture<V> future) {
        if (Thread.currentThread().getName().startsWith("redisson-netty")) {
            throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
        }
        try {
            return (V)future.toCompletableFuture().get();
        }
        catch (InterruptedException e) {
            future.cancel(true);
            Thread.currentThread().interrupt();
            throw new RedisException(e);
        }
        catch (ExecutionException e) {
            throw this.convertException(e);
        }
    }

    @Override
    public <V> V get(CompletableFuture<V> future) {
        if (Thread.currentThread().getName().startsWith("redisson-netty")) {
            throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
        }
        try {
            return future.get();
        }
        catch (InterruptedException e) {
            future.cancel(true);
            Thread.currentThread().interrupt();
            throw new RedisException(e);
        }
        catch (ExecutionException e) {
            throw this.convertException(e);
        }
    }

    @Override
    public <V> V getInterrupted(RFuture<V> future) throws InterruptedException {
        try {
            return (V)future.toCompletableFuture().get();
        }
        catch (InterruptedException e) {
            future.toCompletableFuture().completeExceptionally(e);
            throw e;
        }
        catch (ExecutionException e) {
            throw this.convertException(e);
        }
    }

    @Override
    public <V> V getInterrupted(CompletableFuture<V> future) throws InterruptedException {
        try {
            return future.get();
        }
        catch (InterruptedException e) {
            future.completeExceptionally(e);
            throw e;
        }
        catch (ExecutionException e) {
            throw this.convertException(e);
        }
    }

    protected <R> CompletableFuture<R> createPromise() {
        return new CompletableFuture();
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        return this.async(true, new NodeSource(entry, client), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params) {
        int slot = this.connectionManager.calcSlot(name);
        return this.async(true, new NodeSource(slot, client), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object ... params) {
        int slot = this.connectionManager.calcSlot(key);
        return this.async(true, new NodeSource(slot, client), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params) {
        return this.async(true, new NodeSource(client), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object ... params) {
        CompletableFuture<R> mainPromise = this.createPromise();
        List<RedisClient> nodes = this.connectionManager.getEntrySet().stream().map((? super T e) -> e.getClient()).collect(Collectors.toList());
        Collections.shuffle(nodes);
        this.retryReadRandomAsync(codec, command, mainPromise, nodes, params);
        return new CompletableFutureWrapper<R>(mainPromise);
    }

    @Override
    public <T, R> RFuture<R> readRandomAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params) {
        CompletableFuture<R> mainPromise = this.createPromise();
        ArrayList<RedisClient> list = new ArrayList<RedisClient>(1);
        list.add(client);
        this.retryReadRandomAsync(codec, command, mainPromise, list, params);
        return new CompletableFutureWrapper<R>(mainPromise);
    }

    private <R, T> void retryReadRandomAsync(Codec codec, RedisCommand<T> command, CompletableFuture<R> mainPromise, List<RedisClient> nodes, Object ... params) {
        RedisClient client = nodes.remove(0);
        MasterSlaveEntry masterSlaveEntry = this.connectionManager.getEntry(client);
        RFuture<R> attemptPromise = this.async(true, new NodeSource(masterSlaveEntry, client), codec, command, params, false, false);
        attemptPromise.whenComplete((res, e) -> {
            if (e == null) {
                if (res == null) {
                    if (nodes.isEmpty()) {
                        mainPromise.complete(null);
                    } else {
                        this.retryReadRandomAsync(codec, command, mainPromise, nodes, params);
                    }
                } else {
                    mainPromise.complete(res);
                }
            } else {
                mainPromise.completeExceptionally((Throwable)e);
            }
        });
    }

    @Override
    public <T> RFuture<Void> writeAllVoidAsync(RedisCommand<T> command, Object ... params) {
        List futures = this.writeAllAsync(StringCodec.INSTANCE, command, params);
        CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        return new CompletableFutureWrapper<Void>(f);
    }

    @Override
    public <R> List<CompletableFuture<R>> writeAllAsync(RedisCommand<?> command, Object ... params) {
        return this.writeAllAsync(this.codec, command, params);
    }

    @Override
    public <R> List<CompletableFuture<R>> writeAllAsync(Codec codec, RedisCommand<?> command, Object ... params) {
        List<CompletableFuture<R>> futures = this.connectionManager.getEntrySet().stream().map((? super T e) -> {
            RFuture f = this.async(false, new NodeSource((MasterSlaveEntry)e), codec, command, params, true, false);
            return f.toCompletableFuture();
        }).collect(Collectors.toList());
        return futures;
    }

    @Override
    public <R> List<CompletableFuture<R>> readAllAsync(Codec codec, RedisCommand<?> command, Object ... params) {
        List<CompletableFuture<R>> futures = this.connectionManager.getEntrySet().stream().map((? super T e) -> {
            RFuture f = this.async(true, new NodeSource((MasterSlaveEntry)e), codec, command, params, true, false);
            return f.toCompletableFuture();
        }).collect(Collectors.toList());
        return futures;
    }

    @Override
    public <R> List<CompletableFuture<R>> readAllAsync(RedisCommand<?> command, Object ... params) {
        return this.readAllAsync(this.codec, command, params);
    }

    @Override
    public <R> List<CompletableFuture<R>> executeAllAsync(MasterSlaveEntry entry, RedisCommand<?> command, Object ... params) {
        ArrayList futures = new ArrayList();
        RFuture<R> promise = this.async(false, new NodeSource(entry), this.codec, command, params, true, false);
        futures.add(promise.toCompletableFuture());
        entry.getAllEntries().stream().filter(c -> c.getNodeType() == NodeType.SLAVE && !c.isFreezed()).forEach(c -> {
            RFuture slavePromise = this.async(true, new NodeSource(entry, c.getClient()), this.codec, command, params, true, false);
            futures.add(slavePromise.toCompletableFuture());
        });
        return futures;
    }

    @Override
    public <R> List<CompletableFuture<R>> executeAllAsync(RedisCommand<?> command, Object ... params) {
        Collection<MasterSlaveEntry> nodes = this.connectionManager.getEntrySet();
        ArrayList futures = new ArrayList();
        nodes.forEach(e -> {
            RFuture promise = this.async(false, new NodeSource((MasterSlaveEntry)e), this.codec, command, params, true, false);
            futures.add(promise.toCompletableFuture());
            e.getAllEntries().stream().filter(c -> c.getNodeType() == NodeType.SLAVE && !c.isFreezed()).forEach(c -> {
                RFuture slavePromise = this.async(true, new NodeSource((MasterSlaveEntry)e, c.getClient()), this.codec, command, params, true, false);
                futures.add(slavePromise.toCompletableFuture());
            });
        });
        return futures;
    }

    @Override
    public RedisException convertException(ExecutionException e) {
        if (e.getCause() instanceof RedisException) {
            return (RedisException)e.getCause();
        }
        return new RedisException("Unexpected exception while processing command", e.getCause());
    }

    private NodeSource getNodeSource(String key) {
        int slot = this.connectionManager.calcSlot(key);
        return new NodeSource(slot);
    }

    private NodeSource getNodeSource(byte[] key) {
        int slot = this.connectionManager.calcSlot(key);
        return new NodeSource(slot);
    }

    private NodeSource getNodeSource(ByteBuf key) {
        int slot = this.connectionManager.calcSlot(key);
        return new NodeSource(slot);
    }

    @Override
    public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(true, source, codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(true, source, codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(true, source, codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        return this.async(true, new NodeSource(entry), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params) {
        MasterSlaveEntry entry = this.getConnectionManager().getEntry(client);
        return this.writeAsync(entry, codec, command, params);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        return this.async(false, new NodeSource(entry), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
        return this.readAsync(key, this.codec, command, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, true, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.evalAsync(new NodeSource(entry), true, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        int slot = this.connectionManager.calcSlot(name);
        return this.evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.evalAsync(new NodeSource(entry, client), true, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteAsync(ByteBuf key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(ByteBuf key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, true, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteNoRetryAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, false, codec, evalCommandType, script, keys, true, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, false, params);
    }

    private RFuture<String> loadScript(RedisClient client, String script) {
        MasterSlaveEntry entry = this.getConnectionManager().getEntry(client);
        if (entry.getClient().equals(client)) {
            return this.writeAsync(entry, (Codec)StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
        }
        return this.readAsync(client, (Codec)StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
    }

    protected boolean isEvalCacheActive() {
        return this.connectionManager.getServiceManager().getCfg().isUseScriptCache();
    }

    protected final List<Object> copy(List<Object> params) {
        ArrayList<Object> result = new ArrayList<Object>(params.size());
        for (Object object : params) {
            if (object instanceof ByteBuf) {
                ByteBuf b = (ByteBuf)object;
                ByteBuf nb = b.copy();
                result.add(nb);
                continue;
            }
            result.add(object);
        }
        return result;
    }

    protected final Object[] copy(Object[] params) {
        return this.copy(Arrays.asList(params)).toArray();
    }

    @Override
    public boolean isEvalShaROSupported() {
        return EVAL_SHA_RO_SUPPORTED.get();
    }

    @Override
    public void setEvalShaROSupported(boolean value) {
        EVAL_SHA_RO_SUPPORTED.set(value);
    }

    private String map(String script) {
        if (this.getServiceManager().getConfig().getCommandMapper() instanceof DefaultCommandMapper) {
            return script;
        }
        Matcher matcher = COMMANDS_PATTERN.matcher(script);
        while (matcher.find()) {
            String mapped;
            String command = matcher.group(1);
            if (command.equalsIgnoreCase(mapped = this.getServiceManager().getConfig().getCommandMapper().map(command))) continue;
            script = script.replace(command, mapped);
        }
        return script;
    }

    public <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, boolean noRetry, Object ... params) {
        String mappedScript = this.map(script);
        if (this.isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
            CompletableFuture mainPromise = new CompletableFuture();
            List<Object> keysCopy = this.copy(keys);
            Object[] paramsCopy = this.copy(params);
            CompletableFuture promise = new CompletableFuture();
            String sha1 = this.getServiceManager().calcSHA(mappedScript);
            RedisCommand cmd = readOnlyMode && EVAL_SHA_RO_SUPPORTED.get() ? new RedisCommand<T>(evalCommandType, "EVALSHA_RO") : new RedisCommand(evalCommandType, "EVALSHA");
            ArrayList<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
            args.add(sha1);
            args.add(keys.size());
            args.addAll(keys);
            args.addAll(Arrays.asList(params));
            RedisExecutor executor = new RedisExecutor(readOnlyMode, nodeSource, codec, cmd, args.toArray(), promise, false, this.connectionManager, this.objectBuilder, this.referenceType, noRetry, this.retryAttempts, this.retryInterval, this.responseTimeout, this.trackChanges);
            executor.execute();
            promise.whenComplete((res, e) -> {
                if (e != null) {
                    if (e.getMessage().startsWith("ERR unknown command")) {
                        EVAL_SHA_RO_SUPPORTED.set(false);
                        RFuture future = this.evalAsync(nodeSource, readOnlyMode, codec, evalCommandType, mappedScript, keysCopy, noRetry, paramsCopy);
                        this.transfer(future.toCompletableFuture(), mainPromise);
                    } else if (e.getMessage().startsWith("NOSCRIPT")) {
                        RFuture<String> loadFuture = this.loadScript(executor.getRedisClient(), mappedScript);
                        loadFuture.whenComplete((r, ex) -> {
                            if (ex != null) {
                                this.free(keysCopy);
                                this.free(paramsCopy);
                                mainPromise.completeExceptionally((Throwable)ex);
                                return;
                            }
                            ArrayList<Object> newargs = new ArrayList<Object>(2 + keys.size() + params.length);
                            newargs.add(sha1);
                            newargs.add(keys.size());
                            newargs.addAll(keysCopy);
                            newargs.addAll(Arrays.asList(paramsCopy));
                            NodeSource ns = nodeSource;
                            if (ns.getRedisClient() == null) {
                                ns = new NodeSource(nodeSource, executor.getRedisClient());
                            }
                            RFuture future = this.async(readOnlyMode, ns, codec, cmd, newargs.toArray(), false, noRetry);
                            this.transfer(future.toCompletableFuture(), mainPromise);
                        });
                    } else {
                        this.free(keysCopy);
                        this.free(paramsCopy);
                        mainPromise.completeExceptionally((Throwable)e);
                    }
                    return;
                }
                this.free(keysCopy);
                this.free(paramsCopy);
                mainPromise.complete(res);
            });
            return new CompletableFutureWrapper(mainPromise);
        }
        ArrayList<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
        args.add(mappedScript);
        args.add(keys.size());
        args.addAll(keys);
        args.addAll(Arrays.asList(params));
        return this.async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), false, noRetry);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
        return this.writeAsync(key, this.codec, command, params);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(false, source, codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(false, source, codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(false, source, codec, command, params, false, false);
    }

    public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
        RedisCommand<V> cmnd = this.getServiceManager().resp3(command);
        if (readOnlyMode && cmnd.getName().equals("SORT") && !SORT_RO_SUPPORTED.get()) {
            readOnlyMode = false;
        } else if (readOnlyMode && cmnd.getName().equals("SORT") && SORT_RO_SUPPORTED.get()) {
            RedisCommand<V> cmd = new RedisCommand<V>("SORT_RO", cmnd.getReplayMultiDecoder());
            CompletableFuture mainPromise = this.createPromise();
            RedisExecutor<V, R> executor = new RedisExecutor<V, R>(readOnlyMode, source, codec, cmd, params, mainPromise, ignoreRedirect, this.connectionManager, this.objectBuilder, this.referenceType, noRetry, this.retryAttempts, this.retryInterval, this.responseTimeout, this.trackChanges);
            executor.execute();
            CompletableFuture result = new CompletableFuture();
            mainPromise.whenComplete((r, e) -> {
                if (e != null && e.getMessage().startsWith("ERR unknown command")) {
                    SORT_RO_SUPPORTED.set(false);
                    RFuture future = this.async(false, source, codec, command, params, ignoreRedirect, noRetry);
                    this.transfer(future.toCompletableFuture(), result);
                    return;
                }
                this.transfer(mainPromise, result);
            });
            return new CompletableFutureWrapper(result);
        }
        CompletableFuture<R> mainPromise = this.createPromise();
        RedisExecutor<V, R> executor = new RedisExecutor<V, R>(readOnlyMode, source, codec, cmnd, params, mainPromise, ignoreRedirect, this.connectionManager, this.objectBuilder, this.referenceType, noRetry, this.retryAttempts, this.retryInterval, this.responseTimeout, this.trackChanges);
        executor.execute();
        return new CompletableFutureWrapper<R>(mainPromise);
    }

    private void free(Object[] params) {
        for (Object obj : params) {
            ReferenceCountUtil.safeRelease(obj);
        }
    }

    private void free(List<Object> params) {
        for (Object obj : params) {
            ReferenceCountUtil.safeRelease(obj);
        }
    }

    @Override
    public <T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object ... keys) {
        return this.executeBatchedAsync(true, codec, command, callback, keys);
    }

    @Override
    public <T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object ... keys) {
        return this.executeBatchedAsync(false, codec, command, callback, keys);
    }

    @Override
    public <T, R> RFuture<R> evalWriteBatchedAsync(Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback) {
        return this.evalBatchedAsync(false, codec, command, script, keys, callback);
    }

    @Override
    public <T, R> RFuture<R> evalReadBatchedAsync(Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback) {
        return this.evalBatchedAsync(true, codec, command, script, keys, callback);
    }

    private <T, R> RFuture<R> evalBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback) {
        if (!this.getServiceManager().getCfg().isClusterConfig()) {
            Object[] keysArray = callback.createKeys(null, keys);
            Object[] paramsArray = callback.createParams(Collections.emptyList());
            if (readOnly) {
                return this.evalReadAsync((String)null, codec, command, script, Arrays.asList(keysArray), paramsArray);
            }
            return this.evalWriteAsync((String)null, codec, command, script, Arrays.asList(keysArray), paramsArray);
        }
        Map<Object, Map<Object, Object>> entry2keys = keys.isEmpty() ? this.connectionManager.getEntrySet().stream().collect(Collectors.toMap(Function.identity(), e -> Collections.singletonMap(0, new ArrayList()))) : keys.stream().collect(Collectors.groupingBy(k -> {
            int slot;
            if (k instanceof String) {
                slot = this.connectionManager.calcSlot((String)k);
            } else if (k instanceof ByteBuf) {
                slot = this.connectionManager.calcSlot((ByteBuf)k);
            } else {
                throw new IllegalArgumentException();
            }
            return this.connectionManager.getWriteEntry(slot);
        }, Collectors.groupingBy(k -> {
            if (k instanceof String) {
                return this.connectionManager.calcSlot((String)k);
            }
            if (k instanceof ByteBuf) {
                return this.connectionManager.calcSlot((ByteBuf)k);
            }
            throw new IllegalArgumentException();
        }, Collectors.toList())));
        ArrayList futures = new ArrayList();
        for (Map.Entry<Object, Map<Object, Object>> entry : entry2keys.entrySet()) {
            CommandBatchService executorService = this instanceof CommandBatchService ? (CommandBatchService)this : new CommandBatchService(this);
            for (List list : entry.getValue().values()) {
                RedisCommand<T> c = command;
                RedisCommand<T> newCommand = callback.createCommand(list);
                if (newCommand != null) {
                    c = newCommand;
                }
                Object[] keysArray = callback.createKeys((MasterSlaveEntry)entry.getKey(), list);
                Object[] paramsArray = callback.createParams(Collections.emptyList());
                RFuture<R> f = readOnly ? executorService.evalReadAsync((MasterSlaveEntry)entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray) : executorService.evalWriteAsync((MasterSlaveEntry)entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
                futures.add(f.toCompletableFuture());
            }
            if (this instanceof CommandBatchService) continue;
            executorService.executeAsync();
        }
        CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        CompletionStage result = future.thenApply(r -> {
            List res = futures.stream().map((? super T e) -> e.join()).collect(Collectors.toList());
            return callback.onResult(res);
        });
        return new CompletableFutureWrapper(result);
    }

    private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object[] keys) {
        if (!this.getServiceManager().getCfg().isClusterConfig()) {
            Object[] params = callback.createParams(Arrays.asList(keys));
            CompletionStage<Object> f = readOnly ? this.readAsync((String)null, codec, command, params) : this.writeAsync((String)null, codec, command, params);
            f = f.thenApply(r -> callback.onResult(Collections.singletonList(r)));
            return new CompletableFutureWrapper<R>(f);
        }
        Map entry2keys = Arrays.stream(keys).collect(Collectors.groupingBy(k -> {
            int slot;
            if (k instanceof String) {
                slot = this.connectionManager.calcSlot((String)k);
            } else if (k instanceof ByteBuf) {
                slot = this.connectionManager.calcSlot((ByteBuf)k);
            } else if (k instanceof byte[]) {
                slot = this.connectionManager.calcSlot((byte[])k);
            } else {
                throw new IllegalArgumentException();
            }
            return this.connectionManager.getWriteEntry(slot);
        }, Collectors.groupingBy(k -> {
            if (k instanceof String) {
                return this.connectionManager.calcSlot((String)k);
            }
            if (k instanceof ByteBuf) {
                return this.connectionManager.calcSlot((ByteBuf)k);
            }
            if (k instanceof byte[]) {
                return this.connectionManager.calcSlot((byte[])k);
            }
            throw new IllegalArgumentException();
        }, Collectors.toList())));
        ArrayList futures = new ArrayList();
        ArrayList mainFutures = new ArrayList();
        for (Map.Entry entry : entry2keys.entrySet()) {
            CommandBatchService executorService = this instanceof CommandBatchService ? (CommandBatchService)this : new CommandBatchService(this);
            for (List<Object> list : entry.getValue().values()) {
                RedisCommand<T> c = command;
                RedisCommand<T> newCommand = callback.createCommand(list);
                if (newCommand != null) {
                    c = newCommand;
                }
                Object[] params = callback.createParams(list);
                RFuture<R> f = readOnly ? executorService.readAsync(entry.getKey(), codec, c, params) : executorService.writeAsync(entry.getKey(), codec, c, params);
                futures.add(f.toCompletableFuture());
            }
            if (this instanceof CommandBatchService) continue;
            RFuture<BatchResult<?>> f = executorService.executeAsync();
            mainFutures.add(f.toCompletableFuture());
        }
        CompletableFuture<Void> future = !mainFutures.isEmpty() ? CompletableFuture.allOf(mainFutures.toArray(new CompletableFuture[0])) : CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        CompletionStage result = future.thenApply(r -> {
            List res = futures.stream().filter(e -> !e.isCompletedExceptionally() && e.getNow(null) != null).map((? super T e) -> e.join()).collect(Collectors.toList());
            return callback.onResult(res);
        });
        return new CompletableFutureWrapper(result);
    }

    @Override
    public RedissonObjectBuilder getObjectBuilder() {
        return this.objectBuilder;
    }

    @Override
    public ServiceManager getServiceManager() {
        return this.connectionManager.getServiceManager();
    }

    @Override
    public ByteBuf encode(Codec codec, Object value) {
        RedissonReference reference;
        if (this.isRedissonReferenceSupportEnabled() && (reference = this.getObjectBuilder().toReference(value)) != null) {
            value = reference;
        }
        try {
            return codec.getValueEncoder().encode(value);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    @Override
    public ByteBuf encodeMapKey(Codec codec, Object value) {
        RedissonReference reference;
        if (this.isRedissonReferenceSupportEnabled() && (reference = this.getObjectBuilder().toReference(value)) != null) {
            value = reference;
        }
        try {
            return codec.getMapKeyEncoder().encode(value);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    @Override
    public ByteBuf encodeMapValue(Codec codec, Object value) {
        RedissonReference reference;
        if (this.isRedissonReferenceSupportEnabled() && (reference = this.getObjectBuilder().toReference(value)) != null) {
            value = reference;
        }
        try {
            return codec.getMapValueEncoder().encode(value);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    @Override
    public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<?> command, long secondsTimeout, String ... queueNames) {
        List mappedNames = Arrays.stream(queueNames).map((? super T m) -> this.connectionManager.getServiceManager().getConfig().getNameMapper().map((String)m)).collect(Collectors.toList());
        if (this.getServiceManager().getCfg().isClusterConfig() && queueNames.length > 0) {
            AtomicReference<Iterator<String>> ref = new AtomicReference<Iterator<String>>();
            ArrayList<String> names = new ArrayList<String>();
            names.add(name);
            names.addAll(mappedNames);
            ref.set(names.iterator());
            AtomicLong counter = new AtomicLong(secondsTimeout);
            CompletionStage<V> result = this.poll(codec, ref, names, counter, command);
            return new CompletableFutureWrapper<V>(result);
        }
        ArrayList<Object> params = new ArrayList<Object>(queueNames.length + 1);
        params.add(name);
        params.addAll(mappedNames);
        params.add(secondsTimeout);
        return this.writeAsync(name, codec, command, params.toArray());
    }

    private <V> CompletionStage<V> poll(Codec codec, AtomicReference<Iterator<String>> ref, List<String> names, AtomicLong counter, RedisCommand<?> command) {
        if (ref.get().hasNext()) {
            String currentName = ref.get().next();
            RFuture future = this.writeAsync(currentName, codec, command, currentName, 1);
            return future.thenCompose(res -> {
                if (res != null) {
                    return CompletableFuture.completedFuture(res);
                }
                if (counter.decrementAndGet() == 0L) {
                    return CompletableFuture.completedFuture(null);
                }
                return this.poll(codec, ref, names, counter, command);
            });
        }
        ref.set(names.iterator());
        return this.poll(codec, ref, names, counter, command);
    }

    @Override
    public <T> CompletionStage<T> handleNoSync(CompletionStage<T> stage, Supplier<CompletionStage<?>> supplier) {
        CompletionStage s = stage.handle((r, ex) -> {
            if (ex != null) {
                if (ex.getCause() != null && ex.getCause().getMessage() != null && ex.getCause().getMessage().equals("None of slaves were synced")) {
                    return ((CompletionStage)supplier.get()).handle((r1, e) -> {
                        if (e != null) {
                            if (e.getCause() != null && e.getCause().getMessage() != null && e.getCause().getMessage().equals("None of slaves were synced")) {
                                throw new CompletionException(ex.getCause());
                            }
                            if (e.getCause() != null) {
                                e.getCause().addSuppressed(ex.getCause());
                            } else {
                                e.addSuppressed(ex.getCause());
                            }
                        }
                        throw new CompletionException(ex.getCause());
                    });
                }
                if (ex.getCause() != null) {
                    throw new CompletionException(ex.getCause());
                }
                throw new CompletionException((Throwable)ex);
            }
            return CompletableFuture.completedFuture(r);
        }).thenCompose(f -> f);
        return s;
    }

    @Override
    public <T> RFuture<T> syncedEvalWithRetry(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.getServiceManager().execute(() -> this.syncedEval(key, codec, evalCommandType, script, keys, params));
    }

    @Override
    public <T> RFuture<T> syncedEvalNoRetry(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.syncedEval(this.getServiceManager().getCfg().getSlavesSyncTimeout(), CommandAsyncExecutor.SyncMode.WAIT, false, key, codec, evalCommandType, script, keys, params);
    }

    @Override
    public <T> RFuture<T> syncedEval(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.syncedEval(this.getServiceManager().getCfg().getSlavesSyncTimeout(), CommandAsyncExecutor.SyncMode.WAIT, true, key, codec, evalCommandType, script, keys, params);
    }

    @Override
    public <T> RFuture<T> syncedEvalNoRetry(long timeout, CommandAsyncExecutor.SyncMode syncMode, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.syncedEval(timeout, syncMode, false, key, codec, evalCommandType, script, keys, params);
    }

    @Override
    public <T> RFuture<T> syncedEvalWithRetry(long timeout, CommandAsyncExecutor.SyncMode syncMode, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.syncedEval(timeout, syncMode, true, key, codec, evalCommandType, script, keys, params);
    }

    private <T> RFuture<T> syncedEval(long timeout, CommandAsyncExecutor.SyncMode syncMode, boolean retry, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        if (this.getServiceManager().getCfg().isSingleConfig() || this instanceof CommandBatchService || this.waitSupportedCommands != null && this.waitSupportedCommands.isEmpty() && syncMode == CommandAsyncExecutor.SyncMode.AUTO || this.waitSupportedCommands != null && !this.waitSupportedCommands.contains(RedisCommands.WAIT.getName()) && syncMode == CommandAsyncExecutor.SyncMode.WAIT || this.waitSupportedCommands != null && !this.waitSupportedCommands.contains(RedisCommands.WAITAOF.getName()) && syncMode == CommandAsyncExecutor.SyncMode.WAIT_AOF) {
            if (retry) {
                return this.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
            }
            return this.evalWriteNoRetryAsync(key, codec, evalCommandType, script, keys, params);
        }
        Future<Object> waitFuture = CompletableFuture.completedFuture(null);
        if (this.waitSupportedCommands == null) {
            CommandBatchService ee = this.createCommandBatchService(BatchOptions.defaults());
            ee.writeAsync(key, RedisCommands.WAIT, 0, 0);
            ee.writeAsync(key, RedisCommands.WAITAOF, 0, 0, 0);
            waitFuture = ee.executeAsync();
        }
        CompletionStage resFuture = waitFuture.handle((r2, ex2) -> {
            Future<Object> replicationFuture;
            MasterSlaveEntry e;
            if (ex2 != null) {
                ArrayList<String> commands = new ArrayList<String>(Arrays.asList(RedisCommands.WAIT.getName(), RedisCommands.WAITAOF.getName()));
                ArrayList<String> msgs = new ArrayList<String>(2);
                msgs.add(ex2.getMessage());
                for (Throwable throwable : ex2.getSuppressed()) {
                    msgs.add(throwable.getMessage());
                }
                block1: for (String msg : msgs) {
                    for (String command : commands) {
                        if (!msg.contains("'" + command + "'")) continue;
                        commands.remove(command);
                        continue block1;
                    }
                }
                if (ex2.getMessage().startsWith("ERR unknown command")) {
                    this.waitSupportedCommands = commands;
                    RFuture f = this.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
                    return f;
                }
                throw new CompletionException((Throwable)ex2);
            }
            if (this.waitSupportedCommands == null) {
                this.waitSupportedCommands = Arrays.asList(RedisCommands.WAIT.getName(), RedisCommands.WAITAOF.getName());
            }
            if ((e = this.connectionManager.getEntry(key)) == null) {
                throw new CompletionException(new RedisNodeNotFoundException("entry for " + key + " hasn't been discovered yet"));
            }
            int slaves = e.getAvailableSlaves();
            if (slaves != -1) {
                HashMap<String, String> map = new HashMap<String, String>(2);
                map.put("connected_slaves", "" + slaves);
                if (e.isAofEnabled()) {
                    map.put("aof_enabled", "1");
                } else {
                    map.put("aof_enabled", "0");
                }
                replicationFuture = CompletableFuture.completedFuture(map);
            } else {
                replicationFuture = this.writeAsync(e, (Codec)StringCodec.INSTANCE, RedisCommands.INFO_ALL, new Object[0]);
            }
            CompletionStage resultFuture = replicationFuture.thenCompose(r -> {
                int availableSlaves = Integer.parseInt(r.getOrDefault("connected_slaves", "0"));
                boolean aofEnabled = "1".equals(r.getOrDefault("aof_enabled", "0"));
                e.setAvailableSlaves(availableSlaves);
                e.setAofEnabled(aofEnabled);
                CommandBatchService executorService = this.createCommandBatchService(availableSlaves, aofEnabled, timeout);
                RFuture result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
                if (executorService == this) {
                    return result;
                }
                RFuture<BatchResult<?>> future = executorService.executeAsync();
                CompletionStage<Object> f = future.handle((res, ex) -> {
                    if (ex != null) {
                        throw new CompletionException((Throwable)ex);
                    }
                    if (res.getSyncedSlaves() < availableSlaves || res.getSyncedSlaves() > availableSlaves) {
                        e.setAvailableSlaves(-1);
                    }
                    if (this.getServiceManager().getCfg().isCheckLockSyncedSlaves() && res.getSyncedSlaves() == 0 && availableSlaves > 0) {
                        throw new CompletionException(new IllegalStateException("None of slaves were synced. Try to increase slavesSyncTimeout setting or set checkLockSyncedSlaves = false."));
                    }
                    return this.getNow(result.toCompletableFuture());
                });
                return f;
            });
            return resultFuture;
        }).thenCompose(f -> f);
        return new CompletableFutureWrapper(resFuture);
    }

    protected CommandBatchService createCommandBatchService(int availableSlaves, boolean aofEnabled, long timeout) {
        BatchOptions options = BatchOptions.defaults();
        if (aofEnabled) {
            options.syncAOF(1, availableSlaves, Duration.ofMillis(timeout));
        } else {
            options.sync(availableSlaves, Duration.ofMillis(timeout));
        }
        return this.createCommandBatchService(options);
    }

    @Override
    public CommandBatchService createCommandBatchService(BatchOptions options) {
        return new CommandBatchService((CommandAsyncExecutor)this, options);
    }

    @Override
    public boolean isTrackChanges() {
        return this.trackChanges;
    }
}

