/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.cache;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.cache.ReactiveCacheContainer;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

class DefaultReactiveCacheContainer<K, V>
implements ReactiveCacheContainer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(DefaultReactiveCacheContainer.class);
    private final Map<K, Container<K, V>> cache = new NonBlockingHashMap();

    DefaultReactiveCacheContainer() {
    }

    @Override
    public Mono<V> compute(K key, BiFunction<K, V, Mono<V>> compute) {
        return this.cache.compute(key, (? super K k, ? super V old) -> {
            if (old == null) {
                Mono loader = (Mono)compute.apply(k, null);
                return new Container<Object, Mono>(k, (DefaultReactiveCacheContainer<Object, Mono>)this, loader);
            }
            old.update(loaded -> (Mono)compute.apply(k, loaded));
            return old;
        }).ref();
    }

    @Override
    public Mono<V> computeIfAbsent(K key, Function<K, Mono<V>> compute) {
        return this.cache.computeIfAbsent(key, (? super K k) -> {
            Mono loader = (Mono)compute.apply(k);
            return new Container<Object, Mono>(k, (DefaultReactiveCacheContainer<Object, Mono>)this, loader);
        }).ref();
    }

    @Override
    public Mono<V> get(K key, Mono<V> defaultValue) {
        Container<K, V> container = this.cache.get(key);
        if (container != null) {
            return container.ref().switchIfEmpty(defaultValue);
        }
        return defaultValue;
    }

    @Override
    public V put(K key, V value) {
        Container<K, V> container = this.cache.put(key, new Container<K, V>(key, this, value));
        if (container != null) {
            container.dispose();
            return (V)container.loaded;
        }
        return null;
    }

    @Override
    public boolean containsKey(K key) {
        return this.cache.containsKey(key);
    }

    @Override
    public V getNow(K key) {
        Container<K, V> container = this.cache.get(key);
        if (container != null) {
            return (V)container.loaded;
        }
        return null;
    }

    @Override
    public V remove(K key) {
        Container<K, V> container = this.cache.remove(key);
        if (null != container) {
            container.dispose();
        }
        return container == null ? null : (V)container.loaded;
    }

    @Override
    public Flux<V> values() {
        return Flux.fromIterable(this.cache.values()).flatMap(Container::ref);
    }

    @Override
    public List<V> valuesNow() {
        return this.cache.values().stream().filter(c -> c.loaded != null).map(c -> c.loaded).collect(Collectors.toList());
    }

    @Override
    public void clear() {
        HashMap<K, Container<K, V>> cache = new HashMap<K, Container<K, V>>(this.cache);
        this.cache.clear();
        for (Container value : cache.values()) {
            value.dispose();
        }
    }

    public void dispose() {
        this.cache.values().forEach(Container::dispose);
        this.cache.clear();
    }

    static class Container<K, T>
    implements Disposable {
        private static final AtomicReferenceFieldUpdater<Container, Mono> LOADER = AtomicReferenceFieldUpdater.newUpdater(Container.class, Mono.class, "loader");
        protected volatile Mono<T> loader;
        private static final AtomicReferenceFieldUpdater<Container, Sinks.One> AWAIT = AtomicReferenceFieldUpdater.newUpdater(Container.class, Sinks.One.class, "await");
        private volatile Sinks.One<T> await;
        private static final AtomicReferenceFieldUpdater<Container, Object> LOADED = AtomicReferenceFieldUpdater.newUpdater(Container.class, Object.class, "loaded");
        public volatile T loaded;
        private final Disposable.Swap disposable = Disposables.swap();
        private final DefaultReactiveCacheContainer<K, T> main;
        private final K key;

        public Container(K key, DefaultReactiveCacheContainer<K, T> main, Mono<T> loader) {
            this.key = key;
            this.main = main;
            this.update(ignore -> loader);
        }

        public Container(K key, DefaultReactiveCacheContainer<K, T> main, T loaded) {
            this.key = key;
            this.main = main;
            this.loaded = loaded;
            this.update(ignore -> Mono.just((Object)loaded));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void update(Function<T, Mono<T>> ref) {
            Container container = this;
            synchronized (container) {
                Sinks.One await;
                Object loader = LOADER.getAndSet(this, null);
                loader = loader != null ? loader.flatMap(ref) : ((await = AWAIT.get(this)) != null ? await.asMono().flatMap(ref) : ref.apply(this.loaded));
                AWAIT.compareAndSet(this, null, Sinks.one());
                LOADER.set(this, (Mono)loader);
            }
        }

        private void afterLoaded(T data) {
            if (data != this.loaded && this.loaded instanceof Disposable) {
                ((Disposable)this.loaded).dispose();
            }
            this.loaded = data;
            Sinks.One await = AWAIT.getAndSet(this, null);
            if (await != null) {
                await.emitValue(data, Reactors.emitFailureHandler());
            }
        }

        public void dispose() {
            this.disposable.dispose();
            T loaded = this.loaded;
            if (loaded instanceof Disposable) {
                ((Disposable)loaded).dispose();
            }
        }

        private void loadError(Throwable err) {
            Sinks.One await = AWAIT.getAndSet(this, null);
            if (await != null) {
                await.emitError(err, Reactors.emitFailureHandler());
            }
            ((DefaultReactiveCacheContainer)this.main).cache.remove(this.key, this);
        }

        private void loadEmpty() {
            Sinks.One await = AWAIT.getAndSet(this, null);
            if (await != null) {
                await.emitEmpty(Reactors.emitFailureHandler());
            }
            ((DefaultReactiveCacheContainer)this.main).cache.remove(this.key, this);
        }

        private Mono<T> tryLoad(ContextView contextView) {
            Mono loader = LOADER.getAndSet(this, null);
            if (loader != null) {
                Sinks.One async = Sinks.one();
                loader.switchIfEmpty(Mono.fromRunnable(this::loadEmpty)).subscribe(loaded -> {
                    this.afterLoaded(loaded);
                    async.emitValue(LOADED.get(this), Reactors.emitFailureHandler());
                }, err -> {
                    this.loadError((Throwable)err);
                    async.emitError(err, Reactors.emitFailureHandler());
                }, () -> async.emitEmpty(Reactors.emitFailureHandler()), Context.of(DefaultReactiveCacheContainer.class, (Object)this));
                return async.asMono();
            }
            Sinks.One sink = AWAIT.get(this);
            if (sink == null) {
                return Mono.fromSupplier(() -> this.loaded);
            }
            return sink.asMono();
        }

        public Mono<T> ref() {
            return Mono.deferContextual(ctx -> {
                if (ctx.getOrEmpty(DefaultReactiveCacheContainer.class).orElse(null) == this) {
                    log.warn("recursive call reactive cache [{}]", this.key);
                    return Mono.justOrEmpty(this.loaded);
                }
                return this.tryLoad((ContextView)ctx);
            });
        }
    }
}

