/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.cache.redis.runtime;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import io.quarkus.cache.CacheException;
import io.quarkus.cache.CompositeCacheKey;
import io.quarkus.cache.redis.runtime.RedisCache;
import io.quarkus.cache.redis.runtime.RedisCacheInfo;
import io.quarkus.cache.runtime.AbstractCache;
import io.quarkus.redis.client.RedisClientName;
import io.quarkus.redis.runtime.datasource.Marshaller;
import io.quarkus.runtime.BlockingOperationControl;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.mutiny.unchecked.UncheckedFunction;
import io.smallrye.mutiny.vertx.MutinyHelper;
import io.vertx.core.Vertx;
import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.mutiny.redis.client.Command;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisConnection;
import io.vertx.mutiny.redis.client.Request;
import io.vertx.mutiny.redis.client.Response;
import jakarta.enterprise.util.TypeLiteral;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.jboss.logging.Logger;

public class RedisCacheImpl
extends AbstractCache
implements RedisCache {
    private static final Logger log = Logger.getLogger(RedisCacheImpl.class);
    private final io.vertx.mutiny.core.Vertx vertx;
    private final Redis redis;
    private final RedisCacheInfo cacheInfo;
    private final Type classOfValue;
    private final Type classOfKey;
    private final Marshaller marshaller;
    private final Supplier<Boolean> blockingAllowedSupplier;

    public RedisCacheImpl(RedisCacheInfo cacheInfo, Optional<String> redisClientName) {
        this(cacheInfo, (io.vertx.mutiny.core.Vertx)Arc.container().select(io.vertx.mutiny.core.Vertx.class, new Annotation[0]).get(), RedisCacheImpl.determineRedisClient(redisClientName), BlockingOperationControl::isBlockingAllowed);
    }

    private static Redis determineRedisClient(Optional<String> redisClientName) {
        ArcContainer container = Arc.container();
        if (redisClientName.isPresent()) {
            return (Redis)container.select(Redis.class, new Annotation[]{RedisClientName.Literal.of((String)redisClientName.get())}).get();
        }
        return (Redis)container.select(Redis.class, new Annotation[0]).get();
    }

    public RedisCacheImpl(RedisCacheInfo cacheInfo, io.vertx.mutiny.core.Vertx vertx, Redis redis, Supplier<Boolean> blockingAllowedSupplier) {
        this.vertx = vertx;
        this.cacheInfo = cacheInfo;
        this.blockingAllowedSupplier = blockingAllowedSupplier;
        this.classOfKey = this.cacheInfo.keyType;
        if (this.cacheInfo.valueType != null) {
            this.classOfValue = this.cacheInfo.valueType;
            this.marshaller = new Marshaller(new Type[]{this.classOfValue, this.classOfKey});
        } else {
            this.classOfValue = null;
            this.marshaller = new Marshaller(new Type[]{this.classOfKey});
        }
        this.marshaller.add(CompositeCacheKey.class);
        this.redis = redis;
    }

    private static boolean isRecomputableError(Throwable error) {
        return error instanceof ConnectException || error instanceof ConnectionPoolTooBusyException;
    }

    public String getName() {
        return Objects.requireNonNullElse(this.cacheInfo.name, "default-redis-cache");
    }

    public Object getDefaultKey() {
        return "default-cache-key";
    }

    @Override
    public Class<?> getDefaultValueType() {
        return this.classOfValue instanceof Class ? (Class)this.classOfValue : null;
    }

    private <K> String encodeKey(K key) {
        return new String(this.marshaller.encode(key), StandardCharsets.UTF_8);
    }

    private <K, V> Uni<V> computeValue(final K key, final Function<K, V> valueLoader, boolean isWorkerThread) {
        if (isWorkerThread) {
            return Uni.createFrom().item(new Supplier<V>(){

                @Override
                public V get() {
                    return valueLoader.apply(key);
                }
            }).runSubscriptionOn(MutinyHelper.blockingExecutor((Vertx)this.vertx.getDelegate(), (boolean)false));
        }
        return Uni.createFrom().item(valueLoader.apply(key));
    }

    @Override
    public <K, V> Uni<V> get(K key, Function<K, V> valueLoader) {
        this.enforceDefaultType("get");
        return this.get(key, this.classOfValue, valueLoader);
    }

    @Override
    public <K, V> Uni<V> get(K key, Class<V> clazz, Function<K, V> valueLoader) {
        return this.get(key, (Type)clazz, valueLoader);
    }

    @Override
    public <K, V> Uni<V> get(K key, TypeLiteral<V> type, Function<K, V> valueLoader) {
        return this.get(key, type.getType(), valueLoader);
    }

    private <K, V> Uni<V> get(final K key, final Type type, final Function<K, V> valueLoader) {
        final byte[] encodedKey = this.marshaller.encode((Object)this.computeActualKey(this.encodeKey(key)));
        final boolean isWorkerThread = this.blockingAllowedSupplier.get();
        return this.withConnection(new Function<RedisConnection, Uni<V>>(){

            @Override
            public Uni<V> apply(final RedisConnection connection) {
                Uni startingPoint = RedisCacheImpl.this.cacheInfo.useOptimisticLocking ? RedisCacheImpl.this.watch(connection, encodedKey).chain(new GetFromConnectionSupplier(connection, type, encodedKey, RedisCacheImpl.this.marshaller)) : new GetFromConnectionSupplier(connection, type, encodedKey, RedisCacheImpl.this.marshaller).get();
                return startingPoint.chain(Unchecked.function((UncheckedFunction)new UncheckedFunction<V, Uni<? extends V>>(){

                    public Uni<V> apply(V cached) throws Exception {
                        if (cached != null) {
                            if (RedisCacheImpl.this.cacheInfo.useOptimisticLocking) {
                                return connection.send(Request.cmd((Command)Command.UNWATCH)).replaceWith(cached);
                            }
                            return Uni.createFrom().item(new StaticSupplier(cached));
                        }
                        Uni uni = RedisCacheImpl.this.computeValue(key, valueLoader, isWorkerThread);
                        return uni.onItem().call(new Function<V, Uni<?>>(){

                            @Override
                            public Uni<?> apply(V value) {
                                if (value == null) {
                                    throw new IllegalArgumentException("Cannot cache `null` value");
                                }
                                byte[] encodedValue = RedisCacheImpl.this.marshaller.encode(value);
                                Uni result = RedisCacheImpl.this.cacheInfo.useOptimisticLocking ? RedisCacheImpl.this.multi(connection, RedisCacheImpl.this.set(connection, encodedKey, encodedValue)).replaceWith(value) : RedisCacheImpl.this.set(connection, encodedKey, encodedValue).replaceWith(value);
                                if (isWorkerThread) {
                                    return result.runSubscriptionOn(MutinyHelper.blockingExecutor((Vertx)RedisCacheImpl.this.vertx.getDelegate(), (boolean)false));
                                }
                                return result;
                            }
                        });
                    }
                }));
            }
        }).onFailure(RedisCacheImpl::isRecomputableError).recoverWithUni(new Function<Throwable, Uni<? extends V>>(){

            @Override
            public Uni<? extends V> apply(Throwable e) {
                log.warn((Object)"Unable to connect to Redis, recomputing cached value", e);
                return RedisCacheImpl.this.computeValue(key, valueLoader, isWorkerThread);
            }
        });
    }

    @Override
    public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
        this.enforceDefaultType("getAsync");
        return this.getAsync(key, this.classOfValue, valueLoader);
    }

    @Override
    public <K, V> Uni<V> getAsync(K key, Class<V> clazz, Function<K, Uni<V>> valueLoader) {
        return this.getAsync(key, (Type)clazz, valueLoader);
    }

    @Override
    public <K, V> Uni<V> getAsync(K key, TypeLiteral<V> type, Function<K, Uni<V>> valueLoader) {
        return this.getAsync(key, type.getType(), valueLoader);
    }

    private <K, V> Uni<V> getAsync(final K key, final Type type, final Function<K, Uni<V>> valueLoader) {
        final byte[] encodedKey = this.marshaller.encode((Object)this.computeActualKey(this.encodeKey(key)));
        return this.withConnection(new Function<RedisConnection, Uni<V>>(){

            @Override
            public Uni<V> apply(RedisConnection connection) {
                Uni startingPoint = RedisCacheImpl.this.cacheInfo.useOptimisticLocking ? RedisCacheImpl.this.watch(connection, encodedKey).chain(new GetFromConnectionSupplier(connection, type, encodedKey, RedisCacheImpl.this.marshaller)) : new GetFromConnectionSupplier(connection, type, encodedKey, RedisCacheImpl.this.marshaller).get();
                return startingPoint.chain(cached -> {
                    if (cached != null) {
                        if (RedisCacheImpl.this.cacheInfo.useOptimisticLocking) {
                            return connection.send(Request.cmd((Command)Command.UNWATCH)).replaceWith(cached);
                        }
                        return Uni.createFrom().item(new StaticSupplier<Object>(cached));
                    }
                    Uni getter = (Uni)valueLoader.apply(key);
                    return getter.chain(value -> {
                        byte[] encodedValue = RedisCacheImpl.this.marshaller.encode(value);
                        if (RedisCacheImpl.this.cacheInfo.useOptimisticLocking) {
                            return RedisCacheImpl.this.multi(connection, RedisCacheImpl.this.set(connection, encodedKey, encodedValue)).replaceWith(value);
                        }
                        return RedisCacheImpl.this.set(connection, encodedKey, encodedValue).replaceWith(value);
                    });
                });
            }
        }).onFailure(RedisCacheImpl::isRecomputableError).recoverWithUni(e -> {
            log.warn((Object)"Unable to connect to Redis, recomputing cached value", e);
            return (Uni)valueLoader.apply(key);
        });
    }

    @Override
    public <K, V> Uni<Void> put(K key, V value) {
        return this.put(key, new StaticSupplier<V>(value));
    }

    @Override
    public <K, V> Uni<Void> put(K key, Supplier<V> supplier) {
        final byte[] encodedKey = this.marshaller.encode((Object)this.computeActualKey(this.encodeKey(key)));
        final byte[] encodedValue = this.marshaller.encode(supplier.get());
        return this.withConnection(new Function<RedisConnection, Uni<Void>>(){

            @Override
            public Uni<Void> apply(RedisConnection connection) {
                return RedisCacheImpl.this.set(connection, encodedKey, encodedValue);
            }
        });
    }

    private void enforceDefaultType(String methodName) {
        if (this.classOfValue == null) {
            throw new UnsupportedOperationException("Cannot use `" + methodName + "` method without a default type configured. Consider using the `" + methodName + "` method accepting the type or configure the default type for the cache " + this.getName());
        }
    }

    @Override
    public <K, V> Uni<V> getOrDefault(K key, V defaultValue) {
        this.enforceDefaultType("getOrDefault");
        return this.getOrDefault(key, this.classOfValue, defaultValue);
    }

    @Override
    public <K, V> Uni<V> getOrDefault(K key, Class<V> clazz, V defaultValue) {
        return this.getOrDefault(key, (Type)clazz, defaultValue);
    }

    @Override
    public <K, V> Uni<V> getOrDefault(K key, TypeLiteral<V> type, V defaultValue) {
        return this.getOrDefault(key, type.getType(), defaultValue);
    }

    private <K, V> Uni<V> getOrDefault(K key, final Type type, V defaultValue) {
        final byte[] encodedKey = this.marshaller.encode((Object)this.computeActualKey(this.encodeKey(key)));
        return this.withConnection(new Function<RedisConnection, Uni<V>>(){

            @Override
            public Uni<V> apply(RedisConnection redisConnection) {
                return RedisCacheImpl.this.doGet(redisConnection, encodedKey, type, RedisCacheImpl.this.marshaller);
            }
        }).onItem().ifNull().continueWith(new StaticSupplier<V>(defaultValue));
    }

    @Override
    public <K, V> Uni<V> getOrNull(K key) {
        this.enforceDefaultType("getOrNull");
        return this.getOrNull(key, this.classOfValue);
    }

    @Override
    public <K, V> Uni<V> getOrNull(K key, Class<V> clazz) {
        return this.getOrNull(key, (Type)clazz);
    }

    @Override
    public <K, V> Uni<V> getOrNull(K key, TypeLiteral<V> type) {
        return this.getOrNull(key, type.getType());
    }

    private <K, V> Uni<V> getOrNull(K key, final Type type) {
        final byte[] encodedKey = this.marshaller.encode((Object)this.computeActualKey(this.encodeKey(key)));
        return this.withConnection(new Function<RedisConnection, Uni<V>>(){

            @Override
            public Uni<V> apply(RedisConnection redisConnection) {
                return RedisCacheImpl.this.doGet(redisConnection, encodedKey, type, RedisCacheImpl.this.marshaller);
            }
        });
    }

    public Uni<Void> invalidate(Object key) {
        byte[] encodedKey = this.marshaller.encode((Object)this.computeActualKey(this.encodeKey(key)));
        return this.redis.send(Request.cmd((Command)Command.DEL).arg(encodedKey)).replaceWithVoid();
    }

    public Uni<Void> invalidateAll() {
        return this.invalidateIf(AlwaysTruePredicate.INSTANCE);
    }

    public Uni<Void> invalidateIf(final Predicate<Object> predicate) {
        return Uni.createFrom().emitter((Consumer)new Consumer<UniEmitter<? super Set<String>>>(){

            @Override
            public void accept(UniEmitter<? super Set<String>> uniEmitter) {
                RedisCacheImpl.this.scanForKeys("0", new HashSet<String>(), uniEmitter);
            }
        }).chain(new Function<Set<String>, Uni<?>>(){

            @Override
            public Uni<?> apply(Set<String> setOfKeys) {
                Request req = Request.cmd((Command)Command.DEL);
                boolean hasAtLeastOneMatch = false;
                for (String key : setOfKeys) {
                    Object userKey = RedisCacheImpl.this.computeUserKey(key);
                    if (!predicate.test(userKey)) continue;
                    hasAtLeastOneMatch = true;
                    req.arg(RedisCacheImpl.this.marshaller.encode((Object)key));
                }
                if (hasAtLeastOneMatch) {
                    return RedisCacheImpl.this.redis.send(req);
                }
                return Uni.createFrom().voidItem();
            }
        }).replaceWithVoid();
    }

    private void scanForKeys(String cursor, final Set<String> result, final UniEmitter<? super Set<String>> em) {
        Request cmd = Request.cmd((Command)Command.SCAN).arg(cursor).arg("MATCH").arg(this.getKeyPattern());
        if (this.cacheInfo.invalidationScanSize.isPresent()) {
            cmd.arg("COUNT").arg(this.cacheInfo.invalidationScanSize.getAsInt());
        }
        this.redis.send(cmd).subscribe().with((Consumer)new Consumer<Response>(){

            @Override
            public void accept(Response response) {
                String newCursor = response.get(0).toString();
                Response partResponse = response.get(1);
                if (partResponse != null) {
                    result.addAll(RedisCacheImpl.this.marshaller.decodeAsList(partResponse, String.class));
                }
                if ("0".equals(newCursor)) {
                    em.complete((Object)result);
                } else {
                    RedisCacheImpl.this.scanForKeys(newCursor, result, (UniEmitter<? super Set<String>>)em);
                }
            }
        }, (Consumer)new Consumer<Throwable>(){

            @Override
            public void accept(Throwable throwable) {
                em.fail(throwable);
            }
        });
    }

    public String computeActualKey(String key) {
        return this.getKeyPrefix() + ":" + key;
    }

    Object computeUserKey(String key) {
        String prefix = this.getKeyPrefix();
        if (!key.startsWith(prefix + ":")) {
            return null;
        }
        String stripped = key.substring(prefix.length() + 1);
        return this.marshaller.decode(this.classOfKey, stripped.getBytes(StandardCharsets.UTF_8));
    }

    private String getKeyPattern() {
        return this.getKeyPrefix() + ":*";
    }

    private String getKeyPrefix() {
        if (this.cacheInfo.prefix != null) {
            return this.cacheInfo.prefix.replace("{cache-name}", this.getName());
        }
        return "cache:" + this.getName();
    }

    private <X> Uni<X> withConnection(final Function<RedisConnection, Uni<X>> function) {
        return this.redis.connect().chain(new Function<RedisConnection, Uni<? extends X>>(){

            @Override
            public Uni<X> apply(RedisConnection con) {
                Uni res;
                try {
                    res = (Uni)function.apply(con);
                }
                catch (Exception e) {
                    res = Uni.createFrom().failure((Throwable)new CacheException((Throwable)e));
                }
                return res.onTermination().call(() -> ((RedisConnection)con).close());
            }
        });
    }

    private Uni<Void> watch(RedisConnection connection, byte[] keyToWatch) {
        return connection.send(Request.cmd((Command)Command.WATCH).arg(keyToWatch)).replaceWithVoid();
    }

    private <X> Uni<X> doGet(RedisConnection connection, byte[] encoded, final Type clazz, final Marshaller marshaller) {
        if (this.cacheInfo.expireAfterAccess.isPresent()) {
            Duration duration = this.cacheInfo.expireAfterAccess.get();
            return connection.send(Request.cmd((Command)Command.GETEX).arg(encoded).arg("EX").arg(duration.toSeconds())).map(new Function<Response, X>(){

                @Override
                public X apply(Response r) {
                    return marshaller.decode(clazz, r);
                }
            });
        }
        return connection.send(Request.cmd((Command)Command.GET).arg(encoded)).map(new Function<Response, X>(){

            @Override
            public X apply(Response r) {
                return marshaller.decode(clazz, r);
            }
        });
    }

    private Uni<Void> set(RedisConnection connection, byte[] key, byte[] value) {
        Request request = Request.cmd((Command)Command.SET).arg(key).arg(value);
        if (this.cacheInfo.expireAfterWrite.isPresent()) {
            request = request.arg("EX").arg(this.cacheInfo.expireAfterWrite.get().toSeconds());
        }
        return connection.send(request).replaceWithVoid();
    }

    private Uni<Void> multi(RedisConnection connection, Uni<Void> operation) {
        return connection.send(Request.cmd((Command)Command.MULTI)).chain(() -> operation).onFailure().call(() -> this.abort(connection)).call(() -> this.exec(connection));
    }

    private Uni<Void> exec(RedisConnection connection) {
        return connection.send(Request.cmd((Command)Command.EXEC)).replaceWithVoid();
    }

    private Uni<Void> abort(RedisConnection connection) {
        return connection.send(Request.cmd((Command)Command.DISCARD)).replaceWithVoid();
    }

    private static class StaticSupplier<V>
    implements Supplier<V> {
        private final V cached;

        public StaticSupplier(V cached) {
            this.cached = cached;
        }

        @Override
        public V get() {
            return this.cached;
        }
    }

    private static class AlwaysTruePredicate
    implements Predicate<Object> {
        public static AlwaysTruePredicate INSTANCE = new AlwaysTruePredicate();

        private AlwaysTruePredicate() {
        }

        @Override
        public boolean test(Object o) {
            return true;
        }
    }

    private class GetFromConnectionSupplier<V>
    implements Supplier<Uni<? extends V>> {
        private final RedisConnection connection;
        private final Type clazz;
        private final byte[] encodedKey;
        private final Marshaller marshaller;

        public GetFromConnectionSupplier(RedisConnection connection, Type clazz, byte[] encodedKey, Marshaller marshaller) {
            this.connection = connection;
            this.clazz = clazz;
            this.encodedKey = encodedKey;
            this.marshaller = marshaller;
        }

        @Override
        public Uni<V> get() {
            return RedisCacheImpl.this.doGet(this.connection, this.encodedKey, this.clazz, this.marshaller);
        }
    }
}

