/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.redis.state;

import com.google.common.collect.Maps;
import com.google.common.primitives.UnsignedBytes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.storm.redis.common.commands.RedisCommands;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.container.RedisCommandsContainerBuilder;
import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer;
import org.apache.storm.redis.state.RedisKeyValueStateIterator;
import org.apache.storm.state.DefaultStateEncoder;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.util.SafeEncoder;

public class RedisKeyValueState<K, V>
implements KeyValueState<K, V> {
    public static final int ITERATOR_CHUNK_SIZE = 100;
    public static final NavigableMap<byte[], byte[]> EMPTY_PENDING_COMMIT_MAP = Maps.unmodifiableNavigableMap(new TreeMap(UnsignedBytes.lexicographicalComparator()));
    private static final Logger LOG = LoggerFactory.getLogger(RedisKeyValueState.class);
    private static final String COMMIT_TXID_KEY = "commit";
    private static final String PREPARE_TXID_KEY = "prepare";
    private final byte[] namespace;
    private final byte[] prepareNamespace;
    private final String txidNamespace;
    private final DefaultStateEncoder<K, V> encoder;
    private final RedisCommandsInstanceContainer container;
    private ConcurrentNavigableMap<byte[], byte[]> pendingPrepare;
    private NavigableMap<byte[], byte[]> pendingCommit;
    private Map<String, String> txIds;

    public RedisKeyValueState(String namespace) {
        this(namespace, new JedisPoolConfig.Builder().build());
    }

    public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig) {
        this(namespace, poolConfig, (Serializer<K>)new DefaultStateSerializer(), (Serializer<V>)new DefaultStateSerializer());
    }

    public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(namespace, RedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer);
    }

    public RedisKeyValueState(String namespace, JedisClusterConfig jedisClusterConfig, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(namespace, RedisCommandsContainerBuilder.build(jedisClusterConfig), keySerializer, valueSerializer);
    }

    public RedisKeyValueState(String namespace, RedisCommandsInstanceContainer container, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.namespace = SafeEncoder.encode(namespace);
        this.prepareNamespace = SafeEncoder.encode(namespace + "$prepare");
        this.txidNamespace = namespace + "$txid";
        this.encoder = new DefaultStateEncoder(keySerializer, valueSerializer);
        this.container = container;
        this.pendingPrepare = this.createPendingPrepareMap();
        this.initTxids();
        this.initPendingCommit();
    }

    private void initTxids() {
        RedisCommands commands = null;
        try {
            commands = this.container.getInstance();
            this.txIds = commands.exists(this.txidNamespace) ? commands.hgetAll(this.txidNamespace) : new HashMap<String, String>();
            LOG.debug("initTxids, txIds {}", (Object)this.txIds);
        }
        finally {
            this.container.returnInstance(commands);
        }
    }

    private void initPendingCommit() {
        RedisCommands commands = null;
        try {
            commands = this.container.getInstance();
            if (commands.exists(this.prepareNamespace).booleanValue()) {
                LOG.debug("Loading previously prepared commit from {}", (Object)this.prepareNamespace);
                TreeMap<byte[], byte[]> pendingCommitMap = new TreeMap<byte[], byte[]>(UnsignedBytes.lexicographicalComparator());
                pendingCommitMap.putAll(commands.hgetAll(this.prepareNamespace));
                this.pendingCommit = Maps.unmodifiableNavigableMap(pendingCommitMap);
            } else {
                LOG.debug("No previously prepared commits.");
                this.pendingCommit = EMPTY_PENDING_COMMIT_MAP;
            }
        }
        finally {
            this.container.returnInstance(commands);
        }
    }

    public void put(K key, V value) {
        LOG.debug("put key '{}', value '{}'", (Object)key, (Object)value);
        byte[] redisKey = this.encoder.encodeKey(key);
        byte[] redisValue = this.encoder.encodeValue(value);
        this.pendingPrepare.put(redisKey, redisValue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public V get(K key) {
        LOG.debug("get key '{}'", (Object)key);
        byte[] redisKey = this.encoder.encodeKey(key);
        byte[] redisValue = null;
        if (this.pendingPrepare.containsKey(redisKey)) {
            redisValue = (byte[])this.pendingPrepare.get(redisKey);
        } else if (this.pendingCommit.containsKey(redisKey)) {
            redisValue = (byte[])this.pendingCommit.get(redisKey);
        } else {
            RedisCommands commands = null;
            try {
                commands = this.container.getInstance();
                redisValue = commands.hget(this.namespace, redisKey);
            }
            finally {
                this.container.returnInstance(commands);
            }
        }
        Object value = null;
        if (redisValue != null) {
            value = this.encoder.decodeValue(redisValue);
        }
        LOG.debug("Value for key '{}' is '{}'", (Object)key, value);
        return (V)value;
    }

    public V get(K key, V defaultValue) {
        V val = this.get(key);
        return val != null ? val : defaultValue;
    }

    public V delete(K key) {
        LOG.debug("delete key '{}'", (Object)key);
        byte[] redisKey = this.encoder.encodeKey(key);
        V curr = this.get(key);
        this.pendingPrepare.put(redisKey, this.encoder.getTombstoneValue());
        return curr;
    }

    public Iterator<Map.Entry<K, V>> iterator() {
        return new RedisKeyValueStateIterator(this.namespace, this.container, this.pendingPrepare.entrySet().iterator(), this.pendingCommit.entrySet().iterator(), 100, this.encoder.getKeySerializer(), this.encoder.getValueSerializer());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void prepareCommit(long txid) {
        LOG.debug("prepareCommit txid {}", (Object)txid);
        this.validatePrepareTxid(txid);
        RedisCommands commands = null;
        try {
            ConcurrentNavigableMap<byte[], byte[]> currentPending = this.pendingPrepare;
            this.pendingPrepare = this.createPendingPrepareMap();
            commands = this.container.getInstance();
            if (commands.exists(this.prepareNamespace).booleanValue()) {
                LOG.debug("Prepared txn already exists, will merge", (Object)txid);
                for (Map.Entry e : this.pendingCommit.entrySet()) {
                    if (currentPending.containsKey(e.getKey())) continue;
                    currentPending.put((byte[])e.getKey(), (byte[])e.getValue());
                }
            }
            if (!currentPending.isEmpty()) {
                commands.hmset(this.prepareNamespace, currentPending);
            } else {
                LOG.debug("Nothing to save for prepareCommit, txid {}.", (Object)txid);
            }
            this.txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
            commands.hmset(this.txidNamespace, this.txIds);
            this.pendingCommit = Maps.unmodifiableNavigableMap(currentPending);
            this.container.returnInstance(commands);
        }
        catch (Throwable throwable) {
            this.container.returnInstance(commands);
            throw throwable;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commit(long txid) {
        LOG.debug("commit txid {}", (Object)txid);
        this.validateCommitTxid(txid);
        RedisCommands commands = null;
        try {
            commands = this.container.getInstance();
            if (!this.pendingCommit.isEmpty()) {
                ArrayList<byte[]> keysToDelete = new ArrayList<byte[]>();
                HashMap<byte[], byte[]> keysToAdd = new HashMap<byte[], byte[]>();
                for (Map.Entry entry : this.pendingCommit.entrySet()) {
                    byte[] key = (byte[])entry.getKey();
                    byte[] value = (byte[])entry.getValue();
                    if (Arrays.equals(this.encoder.getTombstoneValue(), value)) {
                        keysToDelete.add(key);
                        continue;
                    }
                    keysToAdd.put(key, value);
                }
                if (!keysToAdd.isEmpty()) {
                    commands.hmset(this.namespace, keysToAdd);
                }
                if (!keysToDelete.isEmpty()) {
                    commands.hdel(this.namespace, (byte[][])keysToDelete.toArray((T[])new byte[0][]));
                }
            } else {
                LOG.debug("Nothing to save for commit, txid {}.", (Object)txid);
            }
            this.txIds.put(COMMIT_TXID_KEY, String.valueOf(txid));
            commands.hmset(this.txidNamespace, this.txIds);
            commands.del(this.prepareNamespace);
            this.pendingCommit = EMPTY_PENDING_COMMIT_MAP;
        }
        finally {
            this.container.returnInstance(commands);
        }
    }

    public void commit() {
        RedisCommands commands = null;
        try {
            commands = this.container.getInstance();
            if (!this.pendingPrepare.isEmpty()) {
                commands.hmset(this.namespace, this.pendingPrepare);
            } else {
                LOG.debug("Nothing to save for commit");
            }
            this.pendingPrepare = this.createPendingPrepareMap();
        }
        finally {
            this.container.returnInstance(commands);
        }
    }

    public void rollback() {
        LOG.debug("rollback");
        RedisCommands commands = null;
        try {
            commands = this.container.getInstance();
            if (commands.exists(this.prepareNamespace).booleanValue()) {
                commands.del(this.prepareNamespace);
            } else {
                LOG.debug("Nothing to rollback, prepared data is empty");
            }
            Long lastCommittedId = this.lastCommittedTxid();
            if (lastCommittedId != null) {
                this.txIds.put(PREPARE_TXID_KEY, String.valueOf(lastCommittedId));
            } else {
                this.txIds.remove(PREPARE_TXID_KEY);
            }
            if (!this.txIds.isEmpty()) {
                LOG.debug("hmset txidNamespace {}, txIds {}", (Object)this.txidNamespace, (Object)this.txIds);
                commands.hmset(this.txidNamespace, this.txIds);
            }
            this.pendingCommit = EMPTY_PENDING_COMMIT_MAP;
            this.pendingPrepare = this.createPendingPrepareMap();
        }
        finally {
            this.container.returnInstance(commands);
        }
    }

    private void validatePrepareTxid(long txid) {
        Long committedTxid = this.lastCommittedTxid();
        if (committedTxid != null && txid <= committedTxid) {
            throw new RuntimeException("Invalid txid '" + txid + "' for prepare. Txid '" + committedTxid + "' is already committed");
        }
    }

    private void validateCommitTxid(long txid) {
        Long committedTxid = this.lastCommittedTxid();
        if (committedTxid != null && txid < committedTxid) {
            throw new RuntimeException("Invalid txid '" + txid + "' txid '" + committedTxid + "' is already committed");
        }
        Long preparedTxid = this.lastPreparedTxid();
        if (preparedTxid != null && txid != preparedTxid) {
            throw new RuntimeException("Invalid txid '" + txid + "' not same as prepared txid '" + preparedTxid + "'");
        }
    }

    private Long lastCommittedTxid() {
        return this.lastId(COMMIT_TXID_KEY);
    }

    private Long lastPreparedTxid() {
        return this.lastId(PREPARE_TXID_KEY);
    }

    private Long lastId(String key) {
        Long lastId = null;
        String txId = this.txIds.get(key);
        if (txId != null) {
            lastId = Long.valueOf(txId);
        }
        return lastId;
    }

    private ConcurrentNavigableMap<byte[], byte[]> createPendingPrepareMap() {
        return new ConcurrentSkipListMap<byte[], byte[]>(UnsignedBytes.lexicographicalComparator());
    }
}

