/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.RedissonExpirable;
import org.redisson.api.PendingEntry;
import org.redisson.api.PendingResult;
import org.redisson.api.RFuture;
import org.redisson.api.RStream;
import org.redisson.api.StreamConsumer;
import org.redisson.api.StreamGroup;
import org.redisson.api.StreamInfo;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.CodecDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.StreamInfoDecoder;
import org.redisson.command.CommandAsyncExecutor;

public class RedissonStream<K, V>
extends RedissonExpirable
implements RStream<K, V> {
    private static final RedisCommand<Map<StreamMessageId, Map<Object, Object>>> EVAL_XRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("EVAL", RedisCommands.XRANGE.getReplayMultiDecoder());

    public RedissonStream(Codec codec, CommandAsyncExecutor connectionManager, String name) {
        super(codec, connectionManager, name);
    }

    public RedissonStream(CommandAsyncExecutor connectionManager, String name) {
        super(connectionManager, name);
    }

    protected void checkKey(Object key) {
        if (key == null) {
            throw new NullPointerException("key can't be null");
        }
    }

    protected void checkValue(Object value) {
        if (value == null) {
            throw new NullPointerException("value can't be null");
        }
    }

    @Override
    public void createGroup(String groupName) {
        this.get(this.createGroupAsync(groupName));
    }

    @Override
    public RFuture<Void> createGroupAsync(String groupName) {
        return this.createGroupAsync(groupName, StreamMessageId.NEWEST);
    }

    @Override
    public void createGroup(String groupName, StreamMessageId id) {
        this.get(this.createGroupAsync(groupName, id));
    }

    @Override
    public RFuture<Void> createGroupAsync(String groupName, StreamMessageId id) {
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XGROUP, "CREATE", this.getName(), groupName, id, "MKSTREAM");
    }

    @Override
    public RFuture<Long> ackAsync(String groupName, StreamMessageId ... ids) {
        ArrayList<Object> params = new ArrayList<Object>();
        params.add(this.getName());
        params.add(groupName);
        for (StreamMessageId id : ids) {
            params.add(id);
        }
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XACK, params.toArray());
    }

    @Override
    public long ack(String groupName, StreamMessageId ... id) {
        return this.get(this.ackAsync(groupName, id));
    }

    @Override
    public RFuture<PendingResult> listPendingAsync(String groupName) {
        return this.getPendingInfoAsync(groupName);
    }

    @Override
    public PendingResult listPending(String groupName) {
        return this.getPendingInfo(groupName);
    }

    @Override
    public RFuture<PendingResult> getPendingInfoAsync(String groupName) {
        return this.commandExecutor.readAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XPENDING, this.getName(), groupName);
    }

    @Override
    public PendingResult getPendingInfo(String groupName) {
        return this.get(this.listPendingAsync(groupName));
    }

    @Override
    public RFuture<List<PendingEntry>> listPendingAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) {
        return this.commandExecutor.readAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, this.getName(), groupName, startId, endId, count, consumerName);
    }

    @Override
    public RFuture<List<PendingEntry>> listPendingAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count) {
        return this.commandExecutor.readAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, this.getName(), groupName, startId, endId, count);
    }

    @Override
    public List<PendingEntry> listPending(String groupName, StreamMessageId startId, StreamMessageId endId, int count) {
        return this.get(this.listPendingAsync(groupName, startId, endId, count));
    }

    @Override
    public List<PendingEntry> listPending(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) {
        return this.get(this.listPendingAsync(groupName, consumerName, startId, endId, count));
    }

    @Override
    public List<StreamMessageId> fastClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids) {
        return this.get(this.fastClaimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids));
    }

    @Override
    public RFuture<List<StreamMessageId>> fastClaimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids) {
        ArrayList<Object> params = new ArrayList<Object>();
        params.add(this.getName());
        params.add(groupName);
        params.add(consumerName);
        params.add(idleTimeUnit.toMillis(idleTime));
        for (StreamMessageId id : ids) {
            params.add(id.toString());
        }
        params.add("JUSTID");
        return this.commandExecutor.readAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XCLAIM_IDS, params.toArray());
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids) {
        ArrayList<Object> params = new ArrayList<Object>();
        params.add(this.getName());
        params.add(groupName);
        params.add(consumerName);
        params.add(idleTimeUnit.toMillis(idleTime));
        for (StreamMessageId id : ids) {
            params.add(id.toString());
        }
        return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XCLAIM, params.toArray());
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids) {
        return this.get(this.claimAsync(groupName, consumerName, idleTime, idleTimeUnit, ids));
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamMessageId ... ids) {
        return this.readGroupAsync(groupName, consumerName, 0, ids);
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId ... ids) {
        return this.readGroupAsync(groupName, consumerName, count, 0L, null, ids);
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids) {
        return this.readGroupAsync(groupName, consumerName, 0, timeout, unit, ids);
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids) {
        ArrayList<Object> params = new ArrayList<Object>();
        params.add("GROUP");
        params.add(groupName);
        params.add(consumerName);
        if (count > 0) {
            params.add("COUNT");
            params.add(count);
        }
        if (timeout > 0L) {
            params.add("BLOCK");
            params.add(this.toSeconds(timeout, unit) * 1000L);
        }
        params.add("STREAMS");
        params.add(this.getName());
        if (ids.length == 0) {
            params.add(">");
        }
        for (StreamMessageId id : ids) {
            params.add(id.toString());
        }
        if (timeout > 0L) {
            return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XREADGROUP_BLOCKING_SINGLE, params.toArray());
        }
        return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XREADGROUP_SINGLE, params.toArray());
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.readGroup(groupName, consumerName, 0, id, keyToId);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.readGroupAsync(groupName, consumerName, 0, id, keyToId);
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.get(this.readGroupAsync(groupName, consumerName, count, id, keyToId));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.readGroupAsync(groupName, consumerName, count, -1L, null, id, keyToId);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.readGroupAsync(groupName, consumerName, count, timeout, unit, id, Collections.singletonMap(key2, id2));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        HashMap<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
        params.put(key2, id2);
        params.put(key3, id3);
        return this.readGroupAsync(groupName, consumerName, count, timeout, unit, id, params);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.readGroupAsync(groupName, consumerName, 0, timeout, unit, id, keyToId);
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.get(this.readGroupAsync(groupName, consumerName, count, timeout, unit, id, keyToId));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.readGroup(groupName, consumerName, 0, timeout, unit, id, keyToId);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.readGroupAsync(groupName, consumerName, id, Collections.singletonMap(key2, id2));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        HashMap<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
        params.put(key2, id2);
        params.put(key3, id3);
        return this.readGroupAsync(groupName, consumerName, id, params);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.readGroupAsync(groupName, consumerName, count, id, Collections.singletonMap(key2, id2));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        HashMap<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
        params.put(key2, id2);
        params.put(key3, id3);
        return this.readGroupAsync(groupName, consumerName, count, id, params);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.readGroupAsync(groupName, consumerName, timeout, unit, id, Collections.singletonMap(key2, id2));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        HashMap<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
        params.put(key2, id2);
        params.put(key3, id3);
        return this.readGroupAsync(groupName, consumerName, timeout, unit, id, params);
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.get(this.readGroupAsync(groupName, consumerName, id, key2, id2));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        return this.get(this.readGroupAsync(groupName, consumerName, id, key2, id2, key3, id3));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.get(this.readGroupAsync(groupName, consumerName, count, id, key2, id2));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        return this.get(this.readGroupAsync(groupName, consumerName, count, id, key2, id2, key3, id3));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.get(this.readGroupAsync(groupName, consumerName, timeout, unit, id, key2, id2));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        return this.get(this.readGroupAsync(groupName, consumerName, timeout, unit, id, key2, id2, key3, id3));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.get(this.readGroupAsync(groupName, consumerName, count, timeout, unit, id, key2, id2));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        return this.get(this.readGroupAsync(groupName, consumerName, count, timeout, unit, id, key2, id2, key3, id3));
    }

    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        ArrayList<Object> params = new ArrayList<Object>();
        params.add("GROUP");
        params.add(groupName);
        params.add(consumerName);
        if (count > 0) {
            params.add("COUNT");
            params.add(count);
        }
        if (timeout > 0L) {
            params.add("BLOCK");
            params.add(this.toSeconds(timeout, unit) * 1000L);
        }
        params.add("STREAMS");
        params.add(this.getName());
        for (String key : keyToId.keySet()) {
            params.add(key);
        }
        if (id == null) {
            params.add(">");
        } else {
            params.add(id);
        }
        for (StreamMessageId nextId : keyToId.values()) {
            params.add(nextId.toString());
        }
        if (timeout > 0L) {
            return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
        }
        return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XREADGROUP, params.toArray());
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId ... ids) {
        return this.get(this.readGroupAsync(groupName, consumerName, ids));
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids) {
        return this.get(this.readGroupAsync(groupName, consumerName, count, ids));
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids) {
        return this.get(this.readGroupAsync(groupName, consumerName, timeout, unit, ids));
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids) {
        return this.get(this.readGroupAsync(groupName, consumerName, count, timeout, unit, ids));
    }

    @Override
    public StreamMessageId addAll(Map<K, V> entries) {
        return this.addAll(entries, 0, false);
    }

    @Override
    public RFuture<StreamMessageId> addAllAsync(Map<K, V> entries) {
        return this.addAllAsync(entries, 0, false);
    }

    @Override
    public void addAll(StreamMessageId id, Map<K, V> entries) {
        this.addAll(id, entries, 0, false);
    }

    @Override
    public RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries) {
        return this.addAllAsync(id, entries, 0, false);
    }

    @Override
    public StreamMessageId addAll(Map<K, V> entries, int trimLen, boolean trimStrict) {
        return this.get(this.addAllAsync(entries, trimLen, trimStrict));
    }

    @Override
    public RFuture<StreamMessageId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict) {
        return this.addAllCustomAsync(null, entries, trimLen, trimStrict);
    }

    @Override
    public void addAll(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
        this.get(this.addAllAsync(id, entries, trimLen, trimStrict));
    }

    private <R> RFuture<R> addAllCustomAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
        ArrayList<Object> params = new ArrayList<Object>(entries.size() * 2 + 1);
        params.add(this.getName());
        if (trimLen > 0) {
            params.add("MAXLEN");
            if (!trimStrict) {
                params.add("~");
            }
            params.add(trimLen);
        }
        if (id == null) {
            params.add("*");
        } else {
            params.add(id.toString());
        }
        for (Map.Entry<K, V> t : entries.entrySet()) {
            this.checkKey(t.getKey());
            this.checkValue(t.getValue());
            params.add(this.encodeMapKey(t.getKey()));
            params.add(this.encodeMapValue(t.getValue()));
        }
        if (id == null) {
            return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XADD, params.toArray());
        }
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XADD_VOID, params.toArray());
    }

    @Override
    public RFuture<Void> addAllAsync(StreamMessageId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
        return this.addAllCustomAsync(id, entries, trimLen, trimStrict);
    }

    @Override
    public long size() {
        return this.get(this.sizeAsync());
    }

    @Override
    public RFuture<Long> sizeAsync() {
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XLEN, this.getName());
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.read(0, id, keyToId);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.readAsync(0, id, keyToId);
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.get(this.readAsync(count, id, keyToId));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.readAsync(count, -1L, null, id, keyToId);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        HashMap<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
        params.put(key2, id2);
        params.put(key3, id3);
        return this.readAsync(count, timeout, unit, id, params);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.readAsync(0, timeout, unit, id, keyToId);
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.get(this.readAsync(count, timeout, unit, id, keyToId));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        return this.read(0, timeout, unit, id, keyToId);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2) {
        return this.readAsync(id, Collections.singletonMap(key2, id2));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        HashMap<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
        params.put(key2, id2);
        params.put(key3, id3);
        return this.readAsync(id, params);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.readAsync(count, id, Collections.singletonMap(key2, id2));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        HashMap<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
        params.put(key2, id2);
        params.put(key3, id3);
        return this.readAsync(count, id, params);
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.readAsync(timeout, unit, id, Collections.singletonMap(key2, id2));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        HashMap<String, StreamMessageId> params = new HashMap<String, StreamMessageId>(2);
        params.put(key2, id2);
        params.put(key3, id3);
        return this.readAsync(timeout, unit, id, params);
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2) {
        return this.get(this.readAsync(id, key2, id2));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        return this.get(this.readAsync(id, key2, id2, key3, id3));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.get(this.readAsync(count, id, key2, id2));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        return this.get(this.readAsync(count, id, key2, id2, key3, id3));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.get(this.readAsync(timeout, unit, id, key2, id2));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        return this.get(this.readAsync(timeout, unit, id, key2, id2, key3, id3));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2) {
        return this.get(this.readAsync(count, timeout, unit, id, key2, id2));
    }

    @Override
    public Map<String, Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId id, String key2, StreamMessageId id2, String key3, StreamMessageId id3) {
        return this.get(this.readAsync(count, timeout, unit, id, key2, id2, key3, id3));
    }

    @Override
    public RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId id, Map<String, StreamMessageId> keyToId) {
        ArrayList<Object> params = new ArrayList<Object>();
        if (count > 0) {
            params.add("COUNT");
            params.add(count);
        }
        if (timeout > 0L) {
            params.add("BLOCK");
            params.add(this.toSeconds(timeout, unit) * 1000L);
        }
        params.add("STREAMS");
        params.add(this.getName());
        for (String key : keyToId.keySet()) {
            params.add(key);
        }
        params.add(id);
        for (StreamMessageId nextId : keyToId.values()) {
            params.add(nextId.toString());
        }
        if (timeout > 0L) {
            return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XREAD_BLOCKING, params.toArray());
        }
        return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XREAD, params.toArray());
    }

    @Override
    public RFuture<StreamMessageId> addAsync(K key, V value) {
        return this.addAsync(key, value, 0, false);
    }

    @Override
    public RFuture<Void> addAsync(StreamMessageId id, K key, V value) {
        return this.addAsync(id, key, value, 0, false);
    }

    @Override
    public RFuture<StreamMessageId> addAsync(K key, V value, int trimLen, boolean trimStrict) {
        return this.addCustomAsync(null, key, value, trimLen, trimStrict);
    }

    private <R> RFuture<R> addCustomAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
        LinkedList<Object> params = new LinkedList<Object>();
        params.add(this.getName());
        if (trimLen > 0) {
            params.add("MAXLEN");
            if (!trimStrict) {
                params.add("~");
            }
            params.add(trimLen);
        }
        if (id == null) {
            params.add("*");
        } else {
            params.add(id.toString());
        }
        this.checkKey(key);
        this.checkValue(value);
        params.add(this.encodeMapKey(key));
        params.add(this.encodeMapValue(value));
        if (id == null) {
            return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XADD, params.toArray());
        }
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XADD_VOID, params.toArray());
    }

    @Override
    public RFuture<Void> addAsync(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
        return this.addCustomAsync(id, key, value, trimLen, trimStrict);
    }

    @Override
    public StreamMessageId add(K key, V value) {
        return this.get(this.addAsync(key, value));
    }

    @Override
    public void add(StreamMessageId id, K key, V value) {
        this.get(this.addAsync(id, key, value));
    }

    @Override
    public StreamMessageId add(K key, V value, int trimLen, boolean trimStrict) {
        return this.get(this.addAsync(key, value, trimLen, trimStrict));
    }

    @Override
    public void add(StreamMessageId id, K key, V value, int trimLen, boolean trimStrict) {
        this.get(this.addAsync(id, key, value, trimLen, trimStrict));
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId ... ids) {
        return this.readAsync(count, 0L, null, ids);
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids) {
        return this.get(this.readAsync(count, timeout, unit, ids));
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId ... ids) {
        return this.get(this.readAsync(count, ids));
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId ... ids) {
        ArrayList<Object> params = new ArrayList<Object>();
        if (count > 0) {
            params.add("COUNT");
            params.add(count);
        }
        if (timeout > 0L) {
            params.add("BLOCK");
            params.add(this.toSeconds(timeout, unit) * 1000L);
        }
        params.add("STREAMS");
        params.add(this.getName());
        for (StreamMessageId id : ids) {
            params.add(id.toString());
        }
        if (timeout > 0L) {
            return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XREAD_BLOCKING_SINGLE, params.toArray());
        }
        return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XREAD_SINGLE, params.toArray());
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(int count, StreamMessageId startId, StreamMessageId endId) {
        LinkedList<Object> params = new LinkedList<Object>();
        params.add(this.getName());
        params.add(startId);
        params.add(endId);
        if (count > 0) {
            params.add("COUNT");
            params.add(count);
        }
        return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XRANGE, params.toArray());
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> range(int count, StreamMessageId startId, StreamMessageId endId) {
        return this.get(this.rangeAsync(count, startId, endId));
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(int count, StreamMessageId startId, StreamMessageId endId) {
        LinkedList<Object> params = new LinkedList<Object>();
        params.add(this.getName());
        params.add(startId);
        params.add(endId);
        if (count > 0) {
            params.add("COUNT");
            params.add(count);
        }
        return this.commandExecutor.readAsync(this.getName(), this.codec, RedisCommands.XREVRANGE, params.toArray());
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> rangeReversed(int count, StreamMessageId startId, StreamMessageId endId) {
        return this.get(this.rangeReversedAsync(count, startId, endId));
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamMessageId ... ids) {
        return this.readAsync(0, ids);
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamMessageId ... ids) {
        return this.readAsync(0, timeout, unit, ids);
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> rangeAsync(StreamMessageId startId, StreamMessageId endId) {
        return this.rangeAsync(0, startId, endId);
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> rangeReversedAsync(StreamMessageId startId, StreamMessageId endId) {
        return this.rangeReversedAsync(0, startId, endId);
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> read(StreamMessageId ... ids) {
        return this.read(0, ids);
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> read(long timeout, TimeUnit unit, StreamMessageId ... ids) {
        return this.read(0, timeout, unit, ids);
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> range(StreamMessageId startId, StreamMessageId endId) {
        return this.range(0, startId, endId);
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> rangeReversed(StreamMessageId startId, StreamMessageId endId) {
        return this.rangeReversed(0, startId, endId);
    }

    @Override
    public RFuture<Long> removeAsync(StreamMessageId ... ids) {
        ArrayList<Object> params = new ArrayList<Object>();
        params.add(this.getName());
        for (StreamMessageId id : ids) {
            params.add(id);
        }
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XDEL, params.toArray());
    }

    @Override
    public long remove(StreamMessageId ... ids) {
        return this.get(this.removeAsync(ids));
    }

    @Override
    public RFuture<Long> trimAsync(int count) {
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XTRIM, "MAXLEN", count);
    }

    @Override
    public RFuture<Long> trimNonStrictAsync(int count) {
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XTRIM, "MAXLEN", "~", count);
    }

    @Override
    public long trim(int count) {
        return this.get(this.trimAsync(count));
    }

    @Override
    public long trimNonStrict(int count) {
        return this.get(this.trimNonStrictAsync(count));
    }

    @Override
    public RFuture<Void> removeGroupAsync(String groupName) {
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XGROUP, "DESTROY", this.getName(), groupName);
    }

    @Override
    public void removeGroup(String groupName) {
        this.get(this.removeGroupAsync(groupName));
    }

    @Override
    public RFuture<Long> removeConsumerAsync(String groupName, String consumerName) {
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DELCONSUMER", this.getName(), groupName, consumerName);
    }

    @Override
    public long removeConsumer(String groupName, String consumerName) {
        return this.get(this.removeConsumerAsync(groupName, consumerName));
    }

    @Override
    public RFuture<Void> updateGroupMessageIdAsync(String groupName, StreamMessageId id) {
        return this.commandExecutor.writeAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XGROUP, "SETID", this.getName(), groupName, id);
    }

    @Override
    public void updateGroupMessageId(String groupName, StreamMessageId id) {
        this.get(this.updateGroupMessageIdAsync(groupName, id));
    }

    @Override
    public StreamInfo<K, V> getInfo() {
        return this.get(this.getInfoAsync());
    }

    @Override
    public RFuture<StreamInfo<K, V>> getInfoAsync() {
        RedisCommand<Object> xinfoStreamCommand = new RedisCommand<Object>("XINFO", "STREAM", new ListMultiDecoder2(new StreamInfoDecoder(), new CodecDecoder(), new ObjectMapDecoder(this.getCodec(), false)));
        return this.commandExecutor.readAsync(this.getName(), (Codec)StringCodec.INSTANCE, xinfoStreamCommand, this.getName());
    }

    @Override
    public List<StreamGroup> listGroups() {
        return this.get(this.listGroupsAsync());
    }

    @Override
    public RFuture<List<StreamGroup>> listGroupsAsync() {
        return this.commandExecutor.readAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XINFO_GROUPS, this.getName());
    }

    @Override
    public List<StreamConsumer> listConsumers(String groupName) {
        return this.get(this.listConsumersAsync(groupName));
    }

    @Override
    public RFuture<List<StreamConsumer>> listConsumersAsync(String groupName) {
        return this.commandExecutor.readAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XINFO_CONSUMERS, this.getName(), groupName);
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, StreamMessageId startId, StreamMessageId endId, int count) {
        return this.commandExecutor.evalReadAsync(this.getName(), this.codec, EVAL_XRANGE, "local pendingData = redis.call('xpending', KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4]);local result = {}; for i = 1, #pendingData, 1 do local value = redis.call('xrange', KEYS[1], pendingData[i][1], pendingData[i][1]);table.insert(result, value[1]);end; return result;", Collections.singletonList(this.getName()), groupName, startId, endId, count);
    }

    @Override
    public RFuture<Map<StreamMessageId, Map<K, V>>> pendingRangeAsync(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) {
        return this.commandExecutor.evalReadAsync(this.getName(), this.codec, EVAL_XRANGE, "local pendingData = redis.call('xpending', KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4], ARGV[5]);local result = {}; for i = 1, #pendingData, 1 do local value = redis.call('xrange', KEYS[1], pendingData[i][1], pendingData[i][1]);table.insert(result, value[1]);end; return result;", Collections.singletonList(this.getName()), groupName, startId, endId, count, consumerName);
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, String consumerName, StreamMessageId startId, StreamMessageId endId, int count) {
        return this.get(this.pendingRangeAsync(groupName, consumerName, startId, endId, count));
    }

    @Override
    public Map<StreamMessageId, Map<K, V>> pendingRange(String groupName, StreamMessageId startId, StreamMessageId endId, int count) {
        return this.get(this.pendingRangeAsync(groupName, startId, endId, count));
    }
}

