/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.spring.data.connection;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.redisson.spring.data.connection.RedissonBaseReactive;
import org.springframework.data.redis.connection.ReactiveSubscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class RedissonReactiveSubscription
implements ReactiveSubscription {
    private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<ChannelName, Collection<PubSubConnectionEntry>>();
    private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<ChannelName, Collection<PubSubConnectionEntry>>();
    private final ListenableCounter monosListener = new ListenableCounter();
    private final PublishSubscribeService subscribeService;
    private final AtomicReference<Flux<ReactiveSubscription.Message<ByteBuffer, ByteBuffer>>> flux = new AtomicReference();
    private volatile Disposable disposable;

    public RedissonReactiveSubscription(ConnectionManager connectionManager) {
        this.subscribeService = connectionManager.getSubscribeService();
    }

    public Mono<Void> subscribe(ByteBuffer ... channels) {
        this.monosListener.acquire();
        return Mono.defer(() -> {
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
            for (ByteBuffer channel : channels) {
                ChannelName cn = this.toChannelName(channel);
                CompletionStage f = this.subscribeService.subscribe((Codec)ByteArrayCodec.INSTANCE, cn, new RedisPubSubListener[0]);
                f = f.whenComplete((res, e) -> this.channels.put(cn, (Collection<PubSubConnectionEntry>)res));
                futures.add((CompletableFuture)f);
            }
            CompletionStage<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            future = future.whenComplete((r, e) -> this.monosListener.release());
            return Mono.fromFuture(future);
        });
    }

    protected ChannelName toChannelName(ByteBuffer channel) {
        return new ChannelName(RedissonBaseReactive.toByteArray(channel));
    }

    public Mono<Void> pSubscribe(ByteBuffer ... patterns) {
        this.monosListener.acquire();
        return Mono.defer(() -> {
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
            for (ByteBuffer channel : patterns) {
                ChannelName cn = this.toChannelName(channel);
                CompletionStage f = this.subscribeService.psubscribe(cn, (Codec)ByteArrayCodec.INSTANCE, new RedisPubSubListener[0]);
                f = f.whenComplete((res, e) -> this.patterns.put(cn, (Collection<PubSubConnectionEntry>)res));
                futures.add((CompletableFuture)f);
            }
            CompletionStage<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            future = future.whenComplete((r, e) -> this.monosListener.release());
            return Mono.fromFuture(future);
        });
    }

    public Mono<Void> unsubscribe() {
        return this.unsubscribe((ByteBuffer[])this.channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new));
    }

    public Mono<Void> unsubscribe(ByteBuffer ... channels) {
        this.monosListener.acquire();
        return Mono.defer(() -> {
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(channels.length);
            for (ByteBuffer channel : channels) {
                ChannelName cn = this.toChannelName(channel);
                CompletionStage f = this.subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
                f = f.whenComplete((res, e) -> {
                    Map<ChannelName, Collection<PubSubConnectionEntry>> map = this.channels;
                    synchronized (map) {
                        Collection<PubSubConnectionEntry> entries = this.channels.get(cn);
                        for (PubSubConnectionEntry entry : entries) {
                            if (entry.hasListeners(cn)) continue;
                            entries.remove(entry);
                            if (!entries.isEmpty()) continue;
                            this.channels.remove(cn);
                        }
                    }
                });
                futures.add((CompletableFuture)f);
            }
            CompletionStage<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            future = future.whenComplete((r, e) -> this.monosListener.release());
            return Mono.fromFuture(future);
        });
    }

    public Mono<Void> pUnsubscribe() {
        return this.unsubscribe((ByteBuffer[])this.patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).distinct().toArray(ByteBuffer[]::new));
    }

    public Mono<Void> pUnsubscribe(ByteBuffer ... patterns) {
        this.monosListener.acquire();
        return Mono.defer(() -> {
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(patterns.length);
            for (ByteBuffer channel : patterns) {
                ChannelName cn = this.toChannelName(channel);
                CompletionStage f = this.subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
                f = f.whenComplete((res, e) -> {
                    Map<ChannelName, Collection<PubSubConnectionEntry>> map = this.patterns;
                    synchronized (map) {
                        Collection<PubSubConnectionEntry> entries = this.patterns.get(cn);
                        entries.stream().filter(en -> en.hasListeners(cn)).forEach(ee -> this.patterns.remove(cn));
                    }
                });
                futures.add((CompletableFuture)f);
            }
            CompletionStage<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            future = future.whenComplete((r, e) -> this.monosListener.release());
            return Mono.fromFuture(future);
        });
    }

    public Set<ByteBuffer> getChannels() {
        return this.channels.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet());
    }

    public Set<ByteBuffer> getPatterns() {
        return this.patterns.keySet().stream().map(b -> ByteBuffer.wrap(b.getName())).collect(Collectors.toSet());
    }

    public Flux<ReactiveSubscription.Message<ByteBuffer, ByteBuffer>> receive() {
        if (this.flux.get() != null) {
            return this.flux.get();
        }
        Flux f = Flux.create(emitter -> emitter.onRequest(n -> this.monosListener.addListener(() -> {
            BaseRedisPubSubListener listener = new BaseRedisPubSubListener((FluxSink)emitter){
                final /* synthetic */ FluxSink val$emitter;
                {
                    this.val$emitter = fluxSink;
                }

                public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
                    if (!RedissonReactiveSubscription.this.patterns.containsKey(new ChannelName(pattern.toString()))) {
                        return;
                    }
                    this.val$emitter.next((Object)new ReactiveSubscription.PatternMessage((Object)ByteBuffer.wrap(pattern.toString().getBytes()), (Object)ByteBuffer.wrap(channel.toString().getBytes()), (Object)ByteBuffer.wrap((byte[])message)));
                }

                public void onMessage(CharSequence channel, Object msg) {
                    if (!RedissonReactiveSubscription.this.channels.containsKey(new ChannelName(channel.toString()))) {
                        return;
                    }
                    this.val$emitter.next((Object)new ReactiveSubscription.ChannelMessage((Object)ByteBuffer.wrap(channel.toString().getBytes()), (Object)ByteBuffer.wrap((byte[])msg)));
                }
            };
            this.disposable = () -> {
                for (Map.Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : this.channels.entrySet()) {
                    for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
                        pubSubConnectionEntry.removeListener(entry.getKey(), (RedisPubSubListener)listener);
                    }
                }
                for (Map.Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : this.patterns.entrySet()) {
                    for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
                        pubSubConnectionEntry.removeListener(entry.getKey(), (RedisPubSubListener)listener);
                    }
                }
            };
            for (Map.Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : this.channels.entrySet()) {
                for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
                    pubSubConnectionEntry.addListener(entry.getKey(), (RedisPubSubListener)listener);
                }
            }
            for (Map.Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : this.patterns.entrySet()) {
                for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
                    pubSubConnectionEntry.addListener(entry.getKey(), (RedisPubSubListener)listener);
                }
            }
            emitter.onDispose(this.disposable);
        })));
        if (this.flux.compareAndSet(null, (Flux<ReactiveSubscription.Message<ByteBuffer, ByteBuffer>>)f)) {
            return f;
        }
        return this.flux.get();
    }

    public Mono<Void> cancel() {
        return this.unsubscribe().then(this.pUnsubscribe()).then(Mono.fromRunnable(() -> {
            if (this.disposable != null) {
                this.disposable.dispose();
            }
        }));
    }

    public static class ListenableCounter {
        private int state;
        private Runnable r;

        public synchronized void acquire() {
            ++this.state;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void release() {
            ListenableCounter listenableCounter = this;
            synchronized (listenableCounter) {
                --this.state;
                if (this.state != 0) {
                    return;
                }
            }
            if (this.r != null) {
                this.r.run();
                this.r = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public synchronized void addListener(Runnable r) {
            ListenableCounter listenableCounter = this;
            synchronized (listenableCounter) {
                if (this.state != 0) {
                    this.r = r;
                    return;
                }
            }
            r.run();
        }
    }
}

