/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactivePubSubCommands;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class LettuceReactivePubSubCommands
implements ReactivePubSubCommands {
    private final LettuceReactiveRedisConnection connection;
    private final Map<ByteArrayWrapper, Target> channels = new ConcurrentHashMap<ByteArrayWrapper, Target>();
    private final Map<ByteArrayWrapper, Target> patterns = new ConcurrentHashMap<ByteArrayWrapper, Target>();

    LettuceReactivePubSubCommands(LettuceReactiveRedisConnection connection) {
        this.connection = connection;
    }

    public Map<ByteArrayWrapper, Target> getChannels() {
        return this.channels;
    }

    public Map<ByteArrayWrapper, Target> getPatterns() {
        return this.patterns;
    }

    @Override
    public Mono<ReactiveSubscription> createSubscription(SubscriptionListener listener) {
        return this.connection.getPubSubConnection().map(pubSubConnection -> new LettuceReactiveSubscription(listener, (StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer>)pubSubConnection, this, this.connection.translateException()));
    }

    @Override
    public Flux<Long> publish(Publisher<ReactiveSubscription.ChannelMessage<ByteBuffer, ByteBuffer>> messageStream) {
        Assert.notNull(messageStream, (String)"ChannelMessage stream must not be null");
        return this.connection.getCommands().flatMapMany(commands -> Flux.from((Publisher)messageStream).flatMap(message -> commands.publish((Object)((ByteBuffer)message.getChannel()), (Object)((ByteBuffer)message.getMessage()))));
    }

    @Override
    public Mono<Void> subscribe(ByteBuffer ... channels) {
        Assert.notNull((Object)channels, (String)"Channels must not be null");
        Target.trackSubscriptions(channels, this.channels);
        return this.doWithPubSub(commands -> commands.subscribe((Object[])channels));
    }

    public Mono<Void> unsubscribe(ByteBuffer ... channels) {
        Assert.notNull(this.patterns, (String)"Patterns must not be null");
        ByteBuffer[] actualUnsubscribe = Target.trackUnsubscriptions(channels, this.channels);
        if (actualUnsubscribe.length == 0 && channels.length != 0) {
            return Mono.empty();
        }
        return this.doWithPubSub(commands -> commands.unsubscribe((Object[])actualUnsubscribe));
    }

    @Override
    public Mono<Void> pSubscribe(ByteBuffer ... patterns) {
        Assert.notNull((Object)patterns, (String)"Patterns must not be null");
        Target.trackSubscriptions(patterns, this.patterns);
        return this.doWithPubSub(commands -> commands.psubscribe((Object[])patterns));
    }

    public Mono<Void> pUnsubscribe(ByteBuffer ... patterns) {
        Assert.notNull((Object)patterns, (String)"Patterns must not be null");
        ByteBuffer[] actualUnsubscribe = Target.trackUnsubscriptions(patterns, this.patterns);
        if (actualUnsubscribe.length == 0 && patterns.length != 0) {
            return Mono.empty();
        }
        return this.doWithPubSub(commands -> commands.punsubscribe((Object[])actualUnsubscribe));
    }

    private <T> Mono<T> doWithPubSub(Function<RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer>, Mono<T>> function) {
        return this.connection.getPubSubConnection().flatMap(pubSubConnection -> (Mono)function.apply(pubSubConnection.reactive())).onErrorMap(this.connection.translateException());
    }

    static class Target {
        private static final AtomicLongFieldUpdater<Target> SUBSCRIBERS = AtomicLongFieldUpdater.newUpdater(Target.class, "subscribers");
        private final byte[] raw;
        private volatile long subscribers;

        Target(byte[] raw) {
            this.raw = raw;
        }

        public static void trackSubscriptions(ByteBuffer[] targets, Map<ByteArrayWrapper, Target> targetMap) {
            Target.doWithTargets(targets, targetMap, Target::allocate);
        }

        public static ByteBuffer[] trackUnsubscriptions(ByteBuffer[] targets, Map<ByteArrayWrapper, Target> targetMap) {
            return Target.doWithTargets(targets, targetMap, Target::deallocate);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        static ByteBuffer[] doWithTargets(ByteBuffer[] targets, Map<ByteArrayWrapper, Target> targetMap, BiFunction<ByteBuffer, Map<ByteArrayWrapper, Target>, Boolean> f) {
            ArrayList<ByteBuffer> toSubscribe = new ArrayList<ByteBuffer>(targets.length);
            Map<ByteArrayWrapper, Target> map = targetMap;
            synchronized (map) {
                for (ByteBuffer target : targets) {
                    if (!f.apply(target, targetMap).booleanValue()) continue;
                    toSubscribe.add(target);
                }
            }
            return toSubscribe.toArray(new ByteBuffer[0]);
        }

        boolean increment() {
            return SUBSCRIBERS.incrementAndGet(this) == 1L;
        }

        boolean decrement() {
            long l = SUBSCRIBERS.get(this);
            if (l > 0L && SUBSCRIBERS.compareAndSet(this, l, l - 1L)) {
                return l == 1L;
            }
            return false;
        }

        static boolean allocate(ByteBuffer buffer, Map<ByteArrayWrapper, Target> targets) {
            byte[] raw = ByteUtils.getBytes(buffer);
            ByteArrayWrapper wrapper = new ByteArrayWrapper(raw);
            Target targetToUse = targets.get(wrapper);
            if (targetToUse == null) {
                targetToUse = new Target(raw);
                targets.put(wrapper, targetToUse);
            }
            return targetToUse.increment();
        }

        static boolean deallocate(ByteBuffer buffer, Map<ByteArrayWrapper, Target> targets) {
            byte[] raw = ByteUtils.getBytes(buffer);
            ByteArrayWrapper wrapper = new ByteArrayWrapper(raw);
            Target targetToUse = targets.get(wrapper);
            if (targetToUse == null) {
                return false;
            }
            if (targetToUse.decrement()) {
                targets.remove(wrapper);
                return true;
            }
            return false;
        }

        public String toString() {
            return "%s: Subscribers: %s".formatted(new String(this.raw), SUBSCRIBERS.get(this));
        }
    }
}

