/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.Limit;
import io.lettuce.core.Range;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.lettuce.LettuceConverters;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection;
import org.springframework.data.redis.connection.lettuce.RangeConverter;
import org.springframework.data.redis.connection.lettuce.StreamConverters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

class LettuceReactiveStreamCommands
implements ReactiveStreamCommands {
    private final LettuceReactiveRedisConnection connection;

    LettuceReactiveStreamCommands(LettuceReactiveRedisConnection connection) {
        Assert.notNull((Object)connection, (String)"Connection must not be null!");
        this.connection = connection;
    }

    @Override
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.AcknowledgeCommand, Long>> xAck(Publisher<ReactiveStreamCommands.AcknowledgeCommand> commands) {
        return this.connection.execute(cmd -> Flux.from((Publisher)commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull((Object)command.getGroup(), (String)"Group must not be null!");
            Assert.notNull(command.getRecordIds(), (String)"recordIds must not be null!");
            return cmd.xack((Object)command.getKey(), (Object)ByteUtils.getByteBuffer(command.getGroup()), LettuceReactiveStreamCommands.entryIdsToString(command.getRecordIds())).map(value -> new ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.AcknowledgeCommand, Long>((ReactiveStreamCommands.AcknowledgeCommand)command, (Long)value));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.AddStreamRecord, RecordId>> xAdd(Publisher<ReactiveStreamCommands.AddStreamRecord> commands) {
        return this.connection.execute(cmd -> Flux.from((Publisher)commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull(command.getBody(), (String)"Body must not be null!");
            XAddArgs args = new XAddArgs();
            if (!command.getRecord().getId().shouldBeAutoGenerated()) {
                args.id(command.getRecord().getId().getValue());
            }
            return cmd.xadd((Object)command.getKey(), args, command.getBody()).map(value -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.AddStreamRecord, RecordId>((ReactiveStreamCommands.AddStreamRecord)command, RecordId.of(value)));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.DeleteCommand, Long>> xDel(Publisher<ReactiveStreamCommands.DeleteCommand> commands) {
        return this.connection.execute(cmd -> Flux.from((Publisher)commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull(command.getRecordIds(), (String)"recordIds must not be null!");
            return cmd.xdel((Object)command.getKey(), LettuceReactiveStreamCommands.entryIdsToString(command.getRecordIds())).map(value -> new ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.DeleteCommand, Long>((ReactiveStreamCommands.DeleteCommand)command, (Long)value));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>> xGroup(Publisher<ReactiveStreamCommands.GroupCommand> commands) {
        return this.connection.execute(cmd -> Flux.from((Publisher)commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull((Object)command.getGroupName(), (String)"GroupName must not be null!");
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.CREATE)) {
                Assert.notNull((Object)command.getReadOffset(), (String)"ReadOffset must not be null!");
                XReadArgs.StreamOffset offset = XReadArgs.StreamOffset.from((Object)command.getKey(), (String)command.getReadOffset().getOffset());
                return cmd.xgroupCreate(offset, (Object)ByteUtils.getByteBuffer(command.getGroupName())).map(it -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, Object>((ReactiveStreamCommands.GroupCommand)command, it));
            }
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.DELETE_CONSUMER)) {
                return cmd.xgroupDelconsumer((Object)command.getKey(), io.lettuce.core.Consumer.from((Object)ByteUtils.getByteBuffer(command.getGroupName()), (Object)ByteUtils.getByteBuffer(command.getConsumerName()))).map(it -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>((ReactiveStreamCommands.GroupCommand)command, Boolean.TRUE.equals(it) ? "OK" : "Error"));
            }
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.DESTROY)) {
                return cmd.xgroupDestroy((Object)command.getKey(), (Object)ByteUtils.getByteBuffer(command.getGroupName())).map(it -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>((ReactiveStreamCommands.GroupCommand)command, Boolean.TRUE.equals(it) ? "OK" : "Error"));
            }
            throw new IllegalArgumentException("Unknown group command " + (Object)((Object)command.getAction()));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xLen(Publisher<ReactiveRedisConnection.KeyCommand> commands) {
        return this.connection.execute(cmd -> Flux.from((Publisher)commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            return cmd.xlen((Object)command.getKey()).map(value -> new ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>((ReactiveRedisConnection.KeyCommand)command, (Long)value));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRange(Publisher<ReactiveStreamCommands.RangeCommand> commands) {
        return this.connection.execute(cmd -> Flux.from((Publisher)commands).map(command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull(command.getRange(), (String)"Range must not be null!");
            Assert.notNull((Object)command.getLimit(), (String)"Limit must not be null!");
            Range lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
            Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
            return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux>((ReactiveStreamCommands.RangeCommand)command, cmd.xrange((Object)command.getKey(), lettuceRange, lettuceLimit).map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReactiveStreamCommands.ReadCommand> commands) {
        return Flux.from(commands).map(command -> {
            Assert.notNull(command.getStreamOffsets(), (String)"StreamOffsets must not be null!");
            Assert.notNull((Object)command.getReadOptions(), (String)"ReadOptions must not be null!");
            StreamReadOptions readOptions = command.getReadOptions();
            if (readOptions.getBlock() != null && readOptions.getBlock() >= 0L) {
                return new ReactiveRedisConnection.CommandResponse((ReactiveStreamCommands.ReadCommand)command, this.connection.executeDedicated(cmd -> LettuceReactiveStreamCommands.doRead(command, readOptions, (RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>)cmd)));
            }
            return new ReactiveRedisConnection.CommandResponse((ReactiveStreamCommands.ReadCommand)command, this.connection.execute(cmd -> LettuceReactiveStreamCommands.doRead(command, readOptions, (RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>)cmd)));
        });
    }

    private static Flux<ByteBufferRecord> doRead(ReactiveStreamCommands.ReadCommand command, StreamReadOptions readOptions, RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> cmd) {
        XReadArgs.StreamOffset<T>[] streamOffsets = LettuceReactiveStreamCommands.toStreamOffsets(command.getStreamOffsets());
        XReadArgs args = StreamConverters.toReadArgs(readOptions);
        if (command.getConsumer() == null) {
            return cmd.xread(args, streamOffsets).map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
        }
        io.lettuce.core.Consumer<ByteBuffer> lettuceConsumer = LettuceReactiveStreamCommands.toConsumer(command.getConsumer());
        return cmd.xreadgroup(lettuceConsumer, args, streamOffsets).map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRevRange(Publisher<ReactiveStreamCommands.RangeCommand> commands) {
        return this.connection.execute(cmd -> Flux.from((Publisher)commands).map(command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull(command.getRange(), (String)"Range must not be null!");
            Assert.notNull((Object)command.getLimit(), (String)"Limit must not be null!");
            Range lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
            Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
            return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux>((ReactiveStreamCommands.RangeCommand)command, cmd.xrevrange((Object)command.getKey(), lettuceRange, lettuceLimit).map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xTrim(Publisher<ReactiveStreamCommands.TrimCommand> commands) {
        return this.connection.execute(cmd -> Flux.from((Publisher)commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull((Object)command.getCount(), (String)"Count must not be null!");
            return cmd.xtrim((Object)command.getKey(), command.getCount().longValue()).map(value -> new ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.TrimCommand, Long>((ReactiveStreamCommands.TrimCommand)command, (Long)value));
        }));
    }

    private static <T> XReadArgs.StreamOffset<T>[] toStreamOffsets(Collection<StreamOffset<T>> streams) {
        return (XReadArgs.StreamOffset[])streams.stream().map(it -> XReadArgs.StreamOffset.from(it.getKey(), (String)it.getOffset().getOffset())).toArray(XReadArgs.StreamOffset[]::new);
    }

    private static io.lettuce.core.Consumer<ByteBuffer> toConsumer(Consumer consumer) {
        return io.lettuce.core.Consumer.from((Object)ByteUtils.getByteBuffer(consumer.getGroup()), (Object)ByteUtils.getByteBuffer(consumer.getName()));
    }

    private static String[] entryIdsToString(List<RecordId> recordIds) {
        if (recordIds.size() == 1) {
            return new String[]{recordIds.get(0).getValue()};
        }
        return (String[])recordIds.stream().map(RecordId::getValue).toArray(String[]::new);
    }
}

