/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.reactive;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.RedissonKeys;
import org.redisson.api.RFuture;
import org.redisson.api.RKeysReactive;
import org.redisson.api.RType;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveService;
import org.redisson.connection.MasterSlaveEntry;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.subscription.ReactiveSubscription;

public class RedissonKeysReactive
implements RKeysReactive {
    private final CommandReactiveService commandExecutor;
    private final RedissonKeys instance;

    public RedissonKeysReactive(CommandReactiveService commandExecutor) {
        this.instance = new RedissonKeys(commandExecutor);
        this.commandExecutor = commandExecutor;
    }

    @Override
    public Publisher<Integer> getSlot(final String key) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Integer>>(){

            @Override
            public RFuture<Integer> get() {
                return RedissonKeysReactive.this.instance.getSlotAsync(key);
            }
        });
    }

    @Override
    public Publisher<String> getKeysByPattern(String pattern) {
        ArrayList<Publisher<String>> publishers = new ArrayList<Publisher<String>>();
        for (MasterSlaveEntry entry : this.commandExecutor.getConnectionManager().getEntrySet()) {
            publishers.add(this.createKeysIterator(entry, pattern));
        }
        return Streams.merge(publishers);
    }

    @Override
    public Publisher<String> getKeys() {
        return this.getKeysByPattern(null);
    }

    private Publisher<ListScanResult<String>> scanIterator(MasterSlaveEntry entry, long startPos, String pattern) {
        if (pattern == null) {
            return this.commandExecutor.writeReactive(entry, (Codec)StringCodec.INSTANCE, RedisCommands.SCAN, startPos);
        }
        return this.commandExecutor.writeReactive(entry, (Codec)StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern);
    }

    private Publisher<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern) {
        return new Stream<String>(){

            @Override
            public void subscribe(Subscriber<? super String> t) {
                t.onSubscribe(new ReactiveSubscription<String>((Stream)this, t){
                    private List<String> firstValues;
                    private long nextIterPos;
                    private long currentIndex;

                    @Override
                    protected void onRequest(long n) {
                        this.currentIndex = n;
                        this.nextValues();
                    }

                    protected void nextValues() {
                        final 1 m = this;
                        RedissonKeysReactive.this.scanIterator(entry, this.nextIterPos, pattern).subscribe(new Subscriber<ListScanResult<String>>(){

                            @Override
                            public void onSubscribe(Subscription s) {
                                s.request(Long.MAX_VALUE);
                            }

                            @Override
                            public void onNext(ListScanResult<String> res) {
                                long prevIterPos = nextIterPos;
                                if (nextIterPos == 0L && firstValues == null) {
                                    firstValues = (List)res.getValues();
                                } else if (res.getValues().equals(firstValues)) {
                                    m.onComplete();
                                    currentIndex = 0L;
                                    return;
                                }
                                nextIterPos = res.getPos();
                                if (prevIterPos == nextIterPos) {
                                    nextIterPos = -1L;
                                }
                                for (String val : res.getValues()) {
                                    m.onNext(val);
                                    currentIndex--;
                                    if (currentIndex != 0L) continue;
                                    m.onComplete();
                                    return;
                                }
                                if (nextIterPos == -1L) {
                                    m.onComplete();
                                    currentIndex = 0L;
                                }
                            }

                            @Override
                            public void onError(Throwable error) {
                                m.onError(error);
                            }

                            @Override
                            public void onComplete() {
                                if (currentIndex == 0L) {
                                    return;
                                }
                                this.nextValues();
                            }
                        });
                    }
                });
            }
        };
    }

    @Override
    public Publisher<Collection<String>> findKeysByPattern(final String pattern) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Collection<String>>>(){

            @Override
            public RFuture<Collection<String>> get() {
                return RedissonKeysReactive.this.instance.findKeysByPatternAsync(pattern);
            }
        });
    }

    @Override
    public Publisher<String> randomKey() {
        return this.commandExecutor.reactive(new Supplier<RFuture<String>>(){

            @Override
            public RFuture<String> get() {
                return RedissonKeysReactive.this.instance.randomKeyAsync();
            }
        });
    }

    @Override
    public Publisher<Long> deleteByPattern(final String pattern) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonKeysReactive.this.instance.deleteByPatternAsync(pattern);
            }
        });
    }

    @Override
    public Publisher<Long> delete(final String ... keys) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonKeysReactive.this.instance.deleteAsync(keys);
            }
        });
    }

    @Override
    public Publisher<Void> flushdb() {
        return this.commandExecutor.reactive(new Supplier<RFuture<Void>>(){

            @Override
            public RFuture<Void> get() {
                return RedissonKeysReactive.this.instance.flushdbAsync();
            }
        });
    }

    @Override
    public Publisher<Void> flushall() {
        return this.commandExecutor.reactive(new Supplier<RFuture<Void>>(){

            @Override
            public RFuture<Void> get() {
                return RedissonKeysReactive.this.instance.flushallAsync();
            }
        });
    }

    @Override
    public Publisher<Boolean> move(final String name, final int database) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Boolean>>(){

            @Override
            public RFuture<Boolean> get() {
                return RedissonKeysReactive.this.instance.moveAsync(name, database);
            }
        });
    }

    @Override
    public Publisher<Void> migrate(final String name, final String host, final int port, final int database, final long timeout) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Void>>(){

            @Override
            public RFuture<Void> get() {
                return RedissonKeysReactive.this.instance.migrateAsync(name, host, port, database, timeout);
            }
        });
    }

    @Override
    public Publisher<Void> copy(final String name, final String host, final int port, final int database, final long timeout) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Void>>(){

            @Override
            public RFuture<Void> get() {
                return RedissonKeysReactive.this.instance.copyAsync(name, host, port, database, timeout);
            }
        });
    }

    @Override
    public Publisher<Boolean> expire(final String name, final long timeToLive, final TimeUnit timeUnit) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Boolean>>(){

            @Override
            public RFuture<Boolean> get() {
                return RedissonKeysReactive.this.instance.expireAsync(name, timeToLive, timeUnit);
            }
        });
    }

    @Override
    public Publisher<Boolean> expireAt(final String name, final long timestamp) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Boolean>>(){

            @Override
            public RFuture<Boolean> get() {
                return RedissonKeysReactive.this.instance.expireAtAsync(name, timestamp);
            }
        });
    }

    @Override
    public Publisher<Boolean> clearExpire(final String name) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Boolean>>(){

            @Override
            public RFuture<Boolean> get() {
                return RedissonKeysReactive.this.instance.clearExpireAsync(name);
            }
        });
    }

    @Override
    public Publisher<Boolean> renamenx(final String oldName, final String newName) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Boolean>>(){

            @Override
            public RFuture<Boolean> get() {
                return RedissonKeysReactive.this.instance.renamenxAsync(oldName, newName);
            }
        });
    }

    @Override
    public Publisher<Void> rename(final String currentName, final String newName) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Void>>(){

            @Override
            public RFuture<Void> get() {
                return RedissonKeysReactive.this.instance.renameAsync(currentName, newName);
            }
        });
    }

    @Override
    public Publisher<Long> remainTimeToLive(final String name) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonKeysReactive.this.instance.remainTimeToLiveAsync(name);
            }
        });
    }

    @Override
    public Publisher<Long> touch(final String ... names) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonKeysReactive.this.instance.touchAsync(names);
            }
        });
    }

    @Override
    public Publisher<Long> countExists(final String ... names) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonKeysReactive.this.instance.countExistsAsync(names);
            }
        });
    }

    @Override
    public Publisher<RType> getType(final String key) {
        return this.commandExecutor.reactive(new Supplier<RFuture<RType>>(){

            @Override
            public RFuture<RType> get() {
                return RedissonKeysReactive.this.instance.getTypeAsync(key);
            }
        });
    }

    @Override
    public Publisher<Long> unlink(final String ... keys) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonKeysReactive.this.instance.unlinkAsync(keys);
            }
        });
    }

    @Override
    public Publisher<Long> count() {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonKeysReactive.this.instance.countAsync();
            }
        });
    }
}

