/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.core;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.NullUnmarked;
import org.reactivestreams.Publisher;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.core.StreamObjectMapper;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@NullUnmarked
class DefaultReactiveStreamOperations<K, HK, HV>
implements ReactiveStreamOperations<K, HK, HV> {
    private final ReactiveRedisTemplate<?, ?> template;
    private final RedisSerializationContext<K, ?> serializationContext;
    private final StreamObjectMapper objectMapper;

    DefaultReactiveStreamOperations(@NonNull ReactiveRedisTemplate<?, ?> template, final @NonNull RedisSerializationContext<K, ?> serializationContext, @NonNull HashMapper<? super K, ? super HK, ? super HV> hashMapper) {
        this.template = template;
        this.serializationContext = serializationContext;
        this.objectMapper = new StreamObjectMapper(this, hashMapper){
            final /* synthetic */ DefaultReactiveStreamOperations this$0;
            {
                this.this$0 = this$0;
                super(mapper);
            }

            @Override
            protected HashMapper<?, ?, ?> doGetHashMapper(final @NonNull ConversionService conversionService, final @NonNull Class<?> targetType) {
                if (this.this$0.objectMapper.isSimpleType(targetType) || ClassUtils.isAssignable(ByteBuffer.class, targetType)) {
                    return new HashMapper<Object, Object, Object>(this){
                        final /* synthetic */ 1 this$1;
                        {
                            this.this$1 = this$1;
                        }

                        @Override
                        public Map<Object, Object> toHash(@NonNull Object object) {
                            Object key = "payload";
                            Object value = object;
                            if (serializationContext.getHashKeySerializationPair() == null) {
                                key = key.toString().getBytes(StandardCharsets.UTF_8);
                            }
                            if (serializationContext.getHashValueSerializationPair() == null) {
                                value = conversionService.convert(value, byte[].class);
                            }
                            return Collections.singletonMap(key, value);
                        }

                        @Override
                        public Object fromHash(@NonNull Map<Object, Object> hash) {
                            Object value = hash.values().iterator().next();
                            if (ClassUtils.isAssignableValue((Class)targetType, (Object)value)) {
                                return value;
                            }
                            Object deserialized = this.this$1.this$0.deserializeHashValue((ByteBuffer)value);
                            if (ClassUtils.isAssignableValue((Class)targetType, deserialized)) {
                                return value;
                            }
                            return conversionService.convert(deserialized, targetType);
                        }
                    };
                }
                return super.doGetHashMapper(conversionService, targetType);
            }
        };
    }

    @Override
    public Mono<Long> acknowledge(@NonNull K key, @NonNull String group, RecordId ... recordIds) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.hasText((String)group, (String)"Group must not be null or empty");
        Assert.notNull((Object)recordIds, (String)"MessageIds must not be null");
        Assert.notEmpty((Object[])recordIds, (String)"MessageIds must not be empty");
        return this.createMono(streamCommands -> streamCommands.xAck(this.rawKey(key), group, recordIds));
    }

    @Override
    public Mono<RecordId> add(@NonNull Record<K, ?> record) {
        Assert.notNull(record.getStream(), (String)"Key must not be null");
        Assert.notNull(record.getValue(), (String)"Body must not be null");
        MapRecord input = StreamObjectMapper.toMapRecord(this, record);
        return this.createMono(streamCommands -> streamCommands.xAdd(this.serializeRecord(input)));
    }

    @Override
    public Mono<RecordId> add(@NonNull Record<K, ?> record,  @NonNull RedisStreamCommands.XAddOptions xAddOptions) {
        Assert.notNull(record.getStream(), (String)"Key must not be null");
        Assert.notNull(record.getValue(), (String)"Body must not be null");
        Assert.notNull((Object)xAddOptions, (String)"XAddOptions must not be null");
        MapRecord input = StreamObjectMapper.toMapRecord(this, record);
        return this.createMono(streamCommands -> streamCommands.xAdd(this.serializeRecord(input), xAddOptions));
    }

    @Override
    public Flux<MapRecord<K, HK, HV>> claim(@NonNull K key, @NonNull String consumerGroup, @NonNull String newOwner,  @NonNull RedisStreamCommands.XClaimOptions xClaimOptions) {
        return this.createFlux(streamCommands -> streamCommands.xClaim(this.rawKey(key), consumerGroup, newOwner, xClaimOptions).map(this::deserializeRecord));
    }

    @Override
    public Mono<Long> delete(@NonNull K key, RecordId ... recordIds) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)recordIds, (String)"MessageIds must not be null");
        return this.createMono(streamCommands -> streamCommands.xDel(this.rawKey(key), recordIds));
    }

    @Override
    public Mono<String> createGroup(@NonNull K key, @NonNull ReadOffset readOffset, @NonNull String group) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)readOffset, (String)"ReadOffset must not be null");
        Assert.notNull((Object)group, (String)"Group must not be null");
        return this.createMono(streamCommands -> streamCommands.xGroupCreate(this.rawKey(key), group, readOffset, true));
    }

    @Override
    public Mono<String> deleteConsumer(@NonNull K key, @NonNull Consumer consumer) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)consumer, (String)"Consumer must not be null");
        return this.createMono(streamCommands -> streamCommands.xGroupDelConsumer(this.rawKey(key), consumer));
    }

    @Override
    public Mono<String> destroyGroup(@NonNull K key, @NonNull String group) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)group, (String)"Group must not be null");
        return this.createMono(streamCommands -> streamCommands.xGroupDestroy(this.rawKey(key), group));
    }

    @Override
    public Flux<StreamInfo.XInfoConsumer> consumers(@NonNull K key, @NonNull String group) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull((Object)group, (String)"Group must not be null");
        return this.createFlux(streamCommands -> streamCommands.xInfoConsumers(this.rawKey(key), group));
    }

    @Override
    public Mono<StreamInfo.XInfoStream> info(@NonNull K key) {
        Assert.notNull(key, (String)"Key must not be null");
        return this.createMono(streamCommands -> streamCommands.xInfo(this.rawKey(key)));
    }

    @Override
    public Flux<StreamInfo.XInfoGroup> groups(@NonNull K key) {
        Assert.notNull(key, (String)"Key must not be null");
        return this.createFlux(streamCommands -> streamCommands.xInfoGroups(this.rawKey(key)));
    }

    @Override
    public Mono<PendingMessages> pending(@NonNull K key, @NonNull String group, @NonNull Range<?> range, long count) {
        ByteBuffer rawKey = this.rawKey(key);
        return this.createMono(streamCommands -> streamCommands.xPending(rawKey, group, range, (Long)count));
    }

    @Override
    public Mono<PendingMessages> pending(@NonNull K key, @NonNull Consumer consumer, @NonNull Range<?> range, long count) {
        ByteBuffer rawKey = this.rawKey(key);
        return this.createMono(streamCommands -> streamCommands.xPending(rawKey, consumer, range, (Long)count));
    }

    @Override
    public Mono<PendingMessagesSummary> pending(@NonNull K key, @NonNull String group) {
        ByteBuffer rawKey = this.rawKey(key);
        return this.createMono(streamCommands -> streamCommands.xPending(rawKey, group));
    }

    @Override
    public Mono<Long> size(@NonNull K key) {
        Assert.notNull(key, (String)"Key must not be null");
        return this.createMono(streamCommands -> streamCommands.xLen(this.rawKey(key)));
    }

    @Override
    public Flux<MapRecord<K, HK, HV>> range(@NonNull K key, @NonNull Range<String> range, @NonNull Limit limit) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull(range, (String)"Range must not be null");
        Assert.notNull((Object)limit, (String)"Limit must not be null");
        return this.createFlux(streamCommands -> streamCommands.xRange(this.rawKey(key), range, limit).map(this::deserializeRecord));
    }

    @Override
    public Flux<MapRecord<K, HK, HV>> read(@NonNull StreamReadOptions readOptions, StreamOffset<K> ... streams) {
        Assert.notNull((Object)readOptions, (String)"StreamReadOptions must not be null");
        Assert.notNull(streams, (String)"Streams must not be null");
        return this.createFlux(streamCommands -> {
            StreamOffset<ByteBuffer>[] streamOffsets = this.rawStreamOffsets(streams);
            return streamCommands.xRead(readOptions, streamOffsets).map(this::deserializeRecord);
        });
    }

    @Override
    public Flux<MapRecord<K, HK, HV>> read(@NonNull Consumer consumer, @NonNull StreamReadOptions readOptions, StreamOffset<K> ... streams) {
        Assert.notNull((Object)consumer, (String)"Consumer must not be null");
        Assert.notNull((Object)readOptions, (String)"StreamReadOptions must not be null");
        Assert.notNull(streams, (String)"Streams must not be null");
        return this.createFlux(streamCommands -> {
            StreamOffset<ByteBuffer>[] streamOffsets = this.rawStreamOffsets(streams);
            return streamCommands.xReadGroup(consumer, readOptions, streamOffsets).map(this::deserializeRecord);
        });
    }

    @Override
    public Flux<MapRecord<K, HK, HV>> reverseRange(@NonNull K key, @NonNull Range<String> range, @NonNull Limit limit) {
        Assert.notNull(key, (String)"Key must not be null");
        Assert.notNull(range, (String)"Range must not be null");
        Assert.notNull((Object)limit, (String)"Limit must not be null");
        return this.createFlux(streamCommands -> streamCommands.xRevRange(this.rawKey(key), range, limit).map(this::deserializeRecord));
    }

    @Override
    public Mono<Long> trim(@NonNull K key, long count) {
        return this.trim(key, count, false);
    }

    @Override
    public Mono<Long> trim(@NonNull K key, long count, boolean approximateTrimming) {
        Assert.notNull(key, (String)"Key must not be null");
        return this.createMono(streamCommands -> streamCommands.xTrim(this.rawKey(key), count, approximateTrimming));
    }

    @Override
    public <V> HashMapper<V, HK, HV> getHashMapper(@NonNull Class<V> targetType) {
        return this.objectMapper.getHashMapper(targetType);
    }

    private StreamOffset<ByteBuffer>[] rawStreamOffsets(StreamOffset<K>[] streams) {
        return (StreamOffset[])Arrays.stream(streams).map(it -> StreamOffset.create(this.rawKey(it.getKey()), it.getOffset())).toArray(StreamOffset[]::new);
    }

    private <T> Mono<T> createMono(Function<ReactiveStreamCommands, Publisher<T>> function) {
        Assert.notNull(function, (String)"Function must not be null");
        return this.template.doCreateMono(connection -> (Publisher)function.apply(connection.streamCommands()));
    }

    private <T> Flux<T> createFlux(Function<ReactiveStreamCommands, Publisher<T>> function) {
        Assert.notNull(function, (String)"Function must not be null");
        return this.template.doCreateFlux(connection -> (Publisher)function.apply(connection.streamCommands()));
    }

    private ByteBuffer rawKey(K key) {
        return this.serializationContext.getKeySerializationPair().write(key);
    }

    private ByteBuffer rawHashKey(HK key) {
        try {
            return this.serializationContext.getHashKeySerializationPair().write(key);
        }
        catch (IllegalStateException illegalStateException) {
            return ByteBuffer.wrap((byte[])this.objectMapper.getConversionService().convert(key, byte[].class));
        }
    }

    private ByteBuffer rawValue(HV value) {
        try {
            return this.serializationContext.getHashValueSerializationPair().write(value);
        }
        catch (IllegalStateException illegalStateException) {
            return ByteBuffer.wrap((byte[])this.objectMapper.getConversionService().convert(value, byte[].class));
        }
    }

    private HK readHashKey(ByteBuffer buffer) {
        return this.serializationContext.getHashKeySerializationPair().getReader().read(buffer);
    }

    private K readKey(ByteBuffer buffer) {
        return this.serializationContext.getKeySerializationPair().read(buffer);
    }

    private HV deserializeHashValue(ByteBuffer buffer) {
        return this.serializationContext.getHashValueSerializationPair().read(buffer);
    }

    @Override
    public MapRecord<K, HK, HV> deserializeRecord(@NonNull ByteBufferRecord record) {
        return record.map(it -> it.mapEntries(this::deserializeRecordFields).withStreamKey(this.readKey((ByteBuffer)record.getStream())));
    }

    private Map.Entry<HK, HV> deserializeRecordFields(Map.Entry<ByteBuffer, ByteBuffer> it) {
        return Converters.entryOf(this.readHashKey(it.getKey()), this.deserializeHashValue(it.getValue()));
    }

    private ByteBufferRecord serializeRecord(MapRecord<K, ? extends HK, ? extends HV> record) {
        return ByteBufferRecord.of(record.map(it -> it.mapEntries(this::serializeRecordFields).withStreamKey(this.rawKey(record.getStream()))));
    }

    private Map.Entry<ByteBuffer, ByteBuffer> serializeRecordFields(Map.Entry<? extends HK, ? extends HV> it) {
        return Converters.entryOf(this.rawHashKey(it.getKey()), this.rawValue(it.getValue()));
    }
}

