/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.command;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.redisson.RedissonReference;
import org.redisson.SlotCallback;
import org.redisson.api.RFuture;
import org.redisson.cache.LRUCacheMap;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.command.RedisExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandAsyncService
implements CommandAsyncExecutor {
    static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
    final ConnectionManager connectionManager;
    final RedissonObjectBuilder objectBuilder;
    final RedissonObjectBuilder.ReferenceType referenceType;
    private static final Map<String, String> SHA_CACHE = new LRUCacheMap<String, String>(500, 0L, 0L);

    public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
        this.connectionManager = connectionManager;
        this.objectBuilder = objectBuilder;
        this.referenceType = referenceType;
    }

    @Override
    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    private boolean isRedissonReferenceSupportEnabled() {
        return this.objectBuilder != null;
    }

    @Override
    public void syncSubscription(CompletableFuture<?> future) {
        this.get(future);
    }

    @Override
    public void syncSubscriptionInterrupted(CompletableFuture<?> future) throws InterruptedException {
        this.getInterrupted(future);
    }

    @Override
    public <V> V getNow(CompletableFuture<V> future) {
        try {
            return future.getNow(null);
        }
        catch (Exception e) {
            return null;
        }
    }

    @Override
    public <V> void transfer(CompletableFuture<V> future1, CompletableFuture<V> future2) {
        future1.whenComplete((res, e) -> {
            if (e != null) {
                future2.completeExceptionally((Throwable)e);
                return;
            }
            future2.complete(res);
        });
    }

    @Override
    public <V> V get(RFuture<V> future) {
        if (Thread.currentThread().getName().startsWith("redisson-netty")) {
            throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
        }
        try {
            return (V)future.toCompletableFuture().get();
        }
        catch (InterruptedException e) {
            future.cancel(true);
            Thread.currentThread().interrupt();
            throw new RedisException(e);
        }
        catch (ExecutionException e) {
            throw this.convertException(e);
        }
    }

    @Override
    public <V> V get(CompletableFuture<V> future) {
        if (Thread.currentThread().getName().startsWith("redisson-netty")) {
            throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
        }
        try {
            return future.get();
        }
        catch (InterruptedException e) {
            future.cancel(true);
            Thread.currentThread().interrupt();
            throw new RedisException(e);
        }
        catch (ExecutionException e) {
            throw this.convertException(e);
        }
    }

    @Override
    public <V> V getInterrupted(RFuture<V> future) throws InterruptedException {
        try {
            return (V)future.toCompletableFuture().get();
        }
        catch (InterruptedException e) {
            future.toCompletableFuture().completeExceptionally(e);
            throw e;
        }
        catch (ExecutionException e) {
            throw this.convertException(e);
        }
    }

    @Override
    public <V> V getInterrupted(CompletableFuture<V> future) throws InterruptedException {
        try {
            return future.get();
        }
        catch (InterruptedException e) {
            future.completeExceptionally(e);
            throw e;
        }
        catch (ExecutionException e) {
            throw this.convertException(e);
        }
    }

    protected <R> CompletableFuture<R> createPromise() {
        return new CompletableFuture();
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        return this.async(true, new NodeSource(entry, client), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params) {
        int slot = this.connectionManager.calcSlot(name);
        return this.async(true, new NodeSource(slot, client), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object ... params) {
        int slot = this.connectionManager.calcSlot(key);
        return this.async(true, new NodeSource(slot, client), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params) {
        return this.async(true, new NodeSource(client), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
        return this.readAllAsync(this.connectionManager.getCodec(), command, params);
    }

    @Override
    public <T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object ... params) {
        Collection<MasterSlaveEntry> nodes = this.connectionManager.getEntrySet();
        ArrayList futures = new ArrayList();
        for (MasterSlaveEntry entry : nodes) {
            RFuture<R> f = this.async(true, new NodeSource(entry), codec, command, params, true, false);
            futures.add(f.toCompletableFuture());
        }
        CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        CompletionStage resFuture = future.thenApply(r -> {
            ArrayList results = new ArrayList();
            for (CompletableFuture f : futures) {
                Object res = f.getNow(null);
                if (res instanceof Collection) {
                    results.addAll((Collection)res);
                    continue;
                }
                results.add(res);
            }
            return results;
        });
        return new CompletableFutureWrapper<Collection<R>>(resFuture);
    }

    @Override
    public <T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object ... params) {
        CompletableFuture<R> mainPromise = this.createPromise();
        ArrayList<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(this.connectionManager.getEntrySet());
        Collections.shuffle(nodes);
        this.retryReadRandomAsync(codec, command, mainPromise, nodes, params);
        return new CompletableFutureWrapper<R>(mainPromise);
    }

    @Override
    public <T, R> RFuture<R> readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        CompletableFuture<R> mainPromise = this.createPromise();
        this.retryReadRandomAsync(codec, command, mainPromise, Collections.singletonList(entry), params);
        return new CompletableFutureWrapper<R>(mainPromise);
    }

    private <R, T> void retryReadRandomAsync(Codec codec, RedisCommand<T> command, CompletableFuture<R> mainPromise, List<MasterSlaveEntry> nodes, Object ... params) {
        MasterSlaveEntry entry = nodes.remove(0);
        RFuture<R> attemptPromise = this.async(true, new NodeSource(entry), codec, command, params, false, false);
        attemptPromise.whenComplete((res, e) -> {
            if (e == null) {
                if (res == null) {
                    if (nodes.isEmpty()) {
                        mainPromise.complete(null);
                    } else {
                        this.retryReadRandomAsync(codec, command, mainPromise, nodes, params);
                    }
                } else {
                    mainPromise.complete(res);
                }
            } else {
                mainPromise.completeExceptionally((Throwable)e);
            }
        });
    }

    @Override
    public <T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
        return this.writeAllAsync(command, null, params);
    }

    @Override
    public <R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
        return this.allAsync(false, this.connectionManager.getCodec(), command, callback, params);
    }

    @Override
    public <R, T> RFuture<R> writeAllAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
        return this.allAsync(false, codec, command, callback, params);
    }

    @Override
    public <R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
        return this.allAsync(true, this.connectionManager.getCodec(), command, callback, params);
    }

    private <T, R> RFuture<R> allAsync(boolean readOnlyMode, Codec codec, final RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
        final RedissonPromise mainPromise = new RedissonPromise();
        Collection<MasterSlaveEntry> nodes = this.connectionManager.getEntrySet();
        final AtomicInteger counter = new AtomicInteger(nodes.size());
        BiConsumer listener = new BiConsumer<T, Throwable>(){

            @Override
            public void accept(T result, Throwable u) {
                if (u != null && !(u instanceof RedisRedirectException)) {
                    mainPromise.tryFailure(u);
                    return;
                }
                if (u instanceof RedisRedirectException) {
                    result = command.getConvertor().convert(result);
                }
                if (callback != null) {
                    callback.onSlotResult(result);
                }
                if (counter.decrementAndGet() == 0) {
                    if (callback != null) {
                        mainPromise.trySuccess(callback.onFinish());
                    } else {
                        mainPromise.trySuccess(null);
                    }
                }
            }
        };
        for (MasterSlaveEntry entry : nodes) {
            RFuture<R> promise = this.async(readOnlyMode, new NodeSource(entry), codec, command, params, true, false);
            promise.whenComplete(listener);
        }
        return mainPromise;
    }

    @Override
    public RedisException convertException(ExecutionException e) {
        if (e.getCause() instanceof RedisException) {
            return (RedisException)e.getCause();
        }
        return new RedisException("Unexpected exception while processing command", e.getCause());
    }

    private NodeSource getNodeSource(String key) {
        int slot = this.connectionManager.calcSlot(key);
        return new NodeSource(slot);
    }

    private NodeSource getNodeSource(byte[] key) {
        int slot = this.connectionManager.calcSlot(key);
        return new NodeSource(slot);
    }

    @Override
    public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(true, source, codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(true, source, codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        return this.async(true, new NodeSource(entry), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params) {
        MasterSlaveEntry entry = this.getConnectionManager().getEntry(client);
        return this.writeAsync(entry, codec, command, params);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        return this.async(false, new NodeSource(entry), codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
        return this.readAsync(key, this.connectionManager.getCodec(), command, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, true, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.evalAsync(new NodeSource(entry), true, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        int slot = this.connectionManager.calcSlot(name);
        return this.evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteNoRetryAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, false, codec, evalCommandType, script, keys, true, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, false, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
        return this.evalAllAsync(false, command, callback, script, keys, params);
    }

    public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
        final RedissonPromise mainPromise = new RedissonPromise();
        Collection<MasterSlaveEntry> entries = this.connectionManager.getEntrySet();
        final AtomicInteger counter = new AtomicInteger(entries.size());
        BiConsumer listener = new BiConsumer<T, Throwable>(){

            @Override
            public void accept(T t, Throwable u) {
                if (u != null && !(u instanceof RedisRedirectException)) {
                    mainPromise.tryFailure(u);
                    return;
                }
                callback.onSlotResult(t);
                if (counter.decrementAndGet() == 0 && !mainPromise.isDone()) {
                    mainPromise.trySuccess(callback.onFinish());
                }
            }
        };
        ArrayList<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
        args.add(script);
        args.add(keys.size());
        args.addAll(keys);
        args.addAll(Arrays.asList(params));
        for (MasterSlaveEntry entry : entries) {
            RFuture<R> promise = this.async(readOnlyMode, new NodeSource(entry), this.connectionManager.getCodec(), command, args.toArray(), true, false);
            promise.whenComplete(listener);
        }
        return mainPromise;
    }

    private RFuture<String> loadScript(RedisClient client, String script) {
        MasterSlaveEntry entry = this.getConnectionManager().getEntry(client);
        if (entry.getClient().equals(client)) {
            return this.writeAsync(entry, (Codec)StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
        }
        return this.readAsync(client, (Codec)StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
    }

    protected boolean isEvalCacheActive() {
        return this.getConnectionManager().getCfg().isUseScriptCache();
    }

    private String calcSHA(String script) {
        String digest = SHA_CACHE.get(script);
        if (digest == null) {
            try {
                MessageDigest mdigest = MessageDigest.getInstance("SHA-1");
                byte[] s = mdigest.digest(script.getBytes());
                digest = ByteBufUtil.hexDump(s);
                SHA_CACHE.put(script, digest);
            }
            catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
        return digest;
    }

    private Object[] copy(Object[] params) {
        ArrayList<Object> result = new ArrayList<Object>();
        for (Object object : params) {
            if (object instanceof ByteBuf) {
                ByteBuf b = (ByteBuf)object;
                ByteBuf nb = ByteBufAllocator.DEFAULT.buffer(b.readableBytes());
                int ri = b.readerIndex();
                nb.writeBytes(b);
                b.readerIndex(ri);
                result.add(nb);
                continue;
            }
            result.add(object);
        }
        return result.toArray();
    }

    private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, boolean noRetry, Object ... params) {
        if (this.isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
            CompletableFuture mainPromise = new CompletableFuture();
            Object[] pps = this.copy(params);
            CompletableFuture promise = new CompletableFuture();
            String sha1 = this.calcSHA(script);
            RedisCommand<T> cmd = new RedisCommand<T>(evalCommandType, "EVALSHA");
            ArrayList<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
            args.add(sha1);
            args.add(keys.size());
            args.addAll(keys);
            args.addAll(Arrays.asList(params));
            RedisExecutor executor = new RedisExecutor(readOnlyMode, nodeSource, codec, cmd, args.toArray(), promise, false, this.connectionManager, this.objectBuilder, this.referenceType, noRetry);
            executor.execute();
            promise.whenComplete((res, e) -> {
                if (e != null) {
                    if (e.getMessage().startsWith("NOSCRIPT")) {
                        RFuture<String> loadFuture = this.loadScript(executor.getRedisClient(), script);
                        loadFuture.whenComplete((r, ex) -> {
                            if (ex != null) {
                                this.free(pps);
                                mainPromise.completeExceptionally((Throwable)ex);
                                return;
                            }
                            RedisCommand command = new RedisCommand(evalCommandType, "EVALSHA");
                            ArrayList<Object> newargs = new ArrayList<Object>(2 + keys.size() + params.length);
                            newargs.add(sha1);
                            newargs.add(keys.size());
                            newargs.addAll(keys);
                            newargs.addAll(Arrays.asList(pps));
                            NodeSource ns = nodeSource;
                            if (ns.getRedisClient() == null) {
                                ns = new NodeSource(nodeSource, executor.getRedisClient());
                            }
                            RFuture future = this.async(readOnlyMode, ns, codec, command, newargs.toArray(), false, noRetry);
                            future.whenComplete((re, ex\u0441) -> {
                                if (ex\u0441 != null) {
                                    mainPromise.completeExceptionally((Throwable)ex\u0441);
                                } else {
                                    mainPromise.complete(re);
                                }
                            });
                        });
                    } else {
                        this.free(pps);
                        mainPromise.completeExceptionally((Throwable)e);
                    }
                    return;
                }
                this.free(pps);
                mainPromise.complete(res);
            });
            return new CompletableFutureWrapper(mainPromise);
        }
        ArrayList<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
        args.add(script);
        args.add(keys.size());
        args.addAll(keys);
        args.addAll(Arrays.asList(params));
        return this.async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), false, noRetry);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
        return this.writeAsync(key, this.connectionManager.getCodec(), command, params);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(false, source, codec, command, params, false, false);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.async(false, source, codec, command, params, false, false);
    }

    @Override
    public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
        CompletableFuture<R> mainPromise = this.createPromise();
        RedisExecutor<V, R> executor = new RedisExecutor<V, R>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, this.connectionManager, this.objectBuilder, this.referenceType, noRetry);
        executor.execute();
        return new CompletableFutureWrapper<R>(mainPromise);
    }

    private void free(Object[] params) {
        for (Object obj : params) {
            ReferenceCountUtil.safeRelease(obj);
        }
    }

    @Override
    public <T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String ... keys) {
        return this.executeBatchedAsync(true, codec, command, callback, keys);
    }

    @Override
    public <T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String ... keys) {
        return this.executeBatchedAsync(false, codec, command, callback, keys);
    }

    private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String[] keys) {
        if (!this.connectionManager.isClusterMode()) {
            Object[] params = callback.createParams(Arrays.asList(keys));
            if (readOnly) {
                return this.readAsync((String)null, codec, command, params);
            }
            return this.writeAsync((String)null, codec, command, params);
        }
        Map entry2keys = Arrays.stream(keys).collect(Collectors.groupingBy(k -> {
            int slot = this.connectionManager.calcSlot((String)k);
            return this.connectionManager.getEntry(slot);
        }, Collectors.groupingBy(k -> this.connectionManager.calcSlot((String)k), Collectors.toList())));
        ArrayList futures = new ArrayList();
        for (Map.Entry entry : entry2keys.entrySet()) {
            CommandBatchService executorService = this instanceof CommandBatchService ? (CommandBatchService)this : new CommandBatchService(this);
            for (List<String> list : entry.getValue().values()) {
                RFuture<R> f;
                RedisCommand<T> c = command;
                RedisCommand<T> newCommand = callback.createCommand(list);
                if (newCommand != null) {
                    c = newCommand;
                }
                if (readOnly) {
                    f = executorService.readAsync(entry.getKey(), codec, c, callback.createParams(list));
                    futures.add(f.toCompletableFuture());
                    continue;
                }
                f = executorService.writeAsync(entry.getKey(), codec, c, callback.createParams(list));
                futures.add(f.toCompletableFuture());
            }
            if (this instanceof CommandBatchService) continue;
            executorService.executeAsync();
        }
        CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        CompletionStage result = ((CompletableFuture)future.whenComplete((res, e) -> futures.forEach(f -> {
            if (!f.isCompletedExceptionally() && f.getNow(null) != null) {
                callback.onSlotResult(f.getNow(null));
            }
        }))).thenApply(r -> callback.onFinish());
        return new CompletableFutureWrapper(result);
    }

    @Override
    public RedissonObjectBuilder getObjectBuilder() {
        return this.objectBuilder;
    }

    @Override
    public ByteBuf encode(Codec codec, Object value) {
        RedissonReference reference;
        if (this.isRedissonReferenceSupportEnabled() && (reference = this.getObjectBuilder().toReference(value)) != null) {
            value = reference;
        }
        try {
            return codec.getValueEncoder().encode(value);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    @Override
    public ByteBuf encodeMapKey(Codec codec, Object value) {
        RedissonReference reference;
        if (this.isRedissonReferenceSupportEnabled() && (reference = this.getObjectBuilder().toReference(value)) != null) {
            value = reference;
        }
        try {
            return codec.getMapKeyEncoder().encode(value);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    @Override
    public ByteBuf encodeMapValue(Codec codec, Object value) {
        RedissonReference reference;
        if (this.isRedissonReferenceSupportEnabled() && (reference = this.getObjectBuilder().toReference(value)) != null) {
            value = reference;
        }
        try {
            return codec.getMapValueEncoder().encode(value);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    @Override
    public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String ... queueNames) {
        if (this.connectionManager.isClusterMode() && queueNames.length > 0) {
            RedissonPromise result = new RedissonPromise();
            AtomicReference<Iterator<String>> ref = new AtomicReference<Iterator<String>>();
            ArrayList<String> names = new ArrayList<String>();
            names.add(name);
            names.addAll(Arrays.asList(queueNames));
            ref.set(names.iterator());
            AtomicLong counter = new AtomicLong(secondsTimeout);
            this.poll(name, codec, result, ref, names, counter, command);
            return result;
        }
        ArrayList<Object> params = new ArrayList<Object>(queueNames.length + 1);
        params.add(name);
        params.addAll(Arrays.asList(queueNames));
        params.add(secondsTimeout);
        return this.writeAsync(name, codec, command, params.toArray());
    }

    private <V> void poll(String name, Codec codec, RPromise<V> result, AtomicReference<Iterator<String>> ref, List<String> names, AtomicLong counter, RedisCommand<Object> command) {
        if (ref.get().hasNext()) {
            String currentName = ref.get().next();
            RFuture future = this.writeAsync(currentName, codec, command, currentName, 1);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    result.tryFailure((Throwable)e);
                    return;
                }
                if (res != null) {
                    result.trySuccess(res);
                } else {
                    if (counter.decrementAndGet() == 0L) {
                        result.trySuccess(null);
                        return;
                    }
                    this.poll(name, codec, result, ref, names, counter, command);
                }
            });
        } else {
            ref.set(names.iterator());
            this.poll(name, codec, result, ref, names, counter, command);
        }
    }
}

