/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.redis.service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.redis.util.RedisUtils;
import org.apache.nifi.util.Tuple;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.types.Expiration;

@Tags(value={"redis", "distributed", "cache", "map"})
@CapabilityDescription(value="An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service is intended to be used when a non-atomic DistributedMapCacheClient is required.")
public class SimpleRedisDistributedMapCacheClientService
extends AbstractControllerService
implements DistributedMapCacheClient {
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(RedisUtils.REDIS_CONNECTION_POOL, RedisUtils.TTL);
    private volatile RedisConnectionPool redisConnectionPool;
    private Long ttl;

    protected Long getTtl() {
        return this.ttl;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) {
        this.redisConnectionPool = (RedisConnectionPool)context.getProperty(RedisUtils.REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
        this.ttl = context.getProperty(RedisUtils.TTL).asTimePeriod(TimeUnit.SECONDS);
        if (this.ttl == 0L) {
            this.ttl = -1L;
        }
    }

    @OnDisabled
    public void onDisabled() {
        this.redisConnectionPool = null;
    }

    public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        return (Boolean)this.withConnection(redisConnection -> {
            Tuple<byte[], byte[]> kv = this.serialize(key, value, keySerializer, valueSerializer);
            boolean set = redisConnection.stringCommands().setNX((byte[])kv.getKey(), (byte[])kv.getValue());
            if (this.ttl != -1L && set) {
                redisConnection.keyCommands().expire((byte[])kv.getKey(), this.ttl.longValue());
            }
            return set;
        });
    }

    public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
        return (V)this.withConnection(redisConnection -> {
            Tuple<byte[], byte[]> kv = this.serialize(key, value, keySerializer, valueSerializer);
            do {
                List results;
                redisConnection.watch((byte[][])new byte[][]{(byte[])kv.getKey()});
                byte[] existingValue = redisConnection.stringCommands().get((byte[])kv.getKey());
                redisConnection.multi();
                redisConnection.stringCommands().setNX((byte[])kv.getKey(), (byte[])kv.getValue());
                if (this.ttl != -1L && existingValue == null) {
                    redisConnection.keyCommands().expire((byte[])kv.getKey(), this.ttl.longValue());
                }
                if ((results = redisConnection.exec()) == null || results.isEmpty()) continue;
                Object firstResult = results.get(0);
                if (firstResult instanceof Boolean) {
                    Boolean absent = (Boolean)firstResult;
                    return absent != false ? null : valueDeserializer.deserialize(existingValue);
                }
                throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got " + firstResult.getClass().getName() + " with value " + firstResult.toString());
            } while (this.isEnabled());
            return null;
        });
    }

    public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
        return (Boolean)this.withConnection(redisConnection -> {
            byte[] k = this.serialize(key, keySerializer);
            return redisConnection.keyCommands().exists(k);
        });
    }

    public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        this.withConnection(redisConnection -> {
            Tuple<byte[], byte[]> kv = this.serialize(key, value, keySerializer, valueSerializer);
            redisConnection.stringCommands().set((byte[])kv.getKey(), (byte[])kv.getValue(), Expiration.seconds((long)this.ttl), RedisStringCommands.SetOption.upsert());
            return null;
        });
    }

    public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        this.withConnection(redisConnection -> {
            HashMap<byte[], byte[]> values = new HashMap<byte[], byte[]>();
            for (Map.Entry entry : keysAndValues.entrySet()) {
                Tuple<byte[], byte[]> kv = this.serialize(entry.getKey(), entry.getValue(), keySerializer, valueSerializer);
                values.put((byte[])kv.getKey(), (byte[])kv.getValue());
            }
            if (this.getLogger().isDebugEnabled()) {
                this.getLogger().debug(String.format("Queued up %d tuples to mset on Redis connection.", values.size()));
            }
            if (!values.isEmpty()) {
                redisConnection.stringCommands().mSet(values);
                if (this.ttl != -1L) {
                    values.keySet().forEach(k -> redisConnection.keyCommands().expire(k, this.ttl.longValue()));
                }
            }
            return null;
        });
    }

    public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
        return (V)this.withConnection(redisConnection -> {
            byte[] k = this.serialize(key, keySerializer);
            byte[] v = redisConnection.stringCommands().get(k);
            return v == null ? null : valueDeserializer.deserialize(v);
        });
    }

    public void close() {
    }

    public <K> boolean remove(K key, Serializer<K> keySerializer) throws IOException {
        return (Boolean)this.withConnection(redisConnection -> {
            byte[] k = this.serialize(key, keySerializer);
            long numRemoved = redisConnection.keyCommands().del((byte[][])new byte[][]{k});
            return numRemoved > 0L;
        });
    }

    public void migrateProperties(PropertyConfiguration config) {
        config.renameProperty("redis-connection-pool", RedisUtils.REDIS_CONNECTION_POOL.getName());
        config.renameProperty("redis-cache-ttl", RedisUtils.TTL.getName());
    }

    protected byte[][] getKeys(List<byte[]> keys) {
        byte[][] allKeysArray = new byte[keys.size()][];
        for (int i = 0; i < keys.size(); ++i) {
            allKeysArray[i] = keys.get(i);
        }
        return allKeysArray;
    }

    protected <K, V> Tuple<byte[], byte[]> serialize(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        keySerializer.serialize(key, (OutputStream)out);
        byte[] k = out.toByteArray();
        out.reset();
        valueSerializer.serialize(value, (OutputStream)out);
        byte[] v = out.toByteArray();
        return new Tuple((Object)k, (Object)v);
    }

    protected <K> byte[] serialize(K key, Serializer<K> keySerializer) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        keySerializer.serialize(key, (OutputStream)out);
        return out.toByteArray();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected <T> T withConnection(RedisAction<T> action) throws IOException {
        RedisConnection redisConnection = null;
        try {
            redisConnection = this.redisConnectionPool.getConnection();
            Object object = action.execute(redisConnection);
            return (T)object;
        }
        finally {
            if (redisConnection != null) {
                try {
                    redisConnection.close();
                }
                catch (Exception e) {
                    this.getLogger().warn("Error closing connection", (Throwable)e);
                }
            }
        }
    }
}

