/*
 * Decompiled with CFR 0.152.
 */
package org.cojen.tupl;

import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.cojen.tupl.ClosedIndexException;
import org.cojen.tupl.CorruptDatabaseException;
import org.cojen.tupl.DatabaseException;
import org.cojen.tupl.EventListener;
import org.cojen.tupl.EventType;
import org.cojen.tupl.Index;
import org.cojen.tupl.LHashTable;
import org.cojen.tupl.LockMode;
import org.cojen.tupl.RedoVisitor;
import org.cojen.tupl.ReplRedoDecoder;
import org.cojen.tupl.Transaction;
import org.cojen.tupl.UnmodifiableReplicaException;
import org.cojen.tupl.Utils;
import org.cojen.tupl._LocalDatabase;
import org.cojen.tupl._LocalTransaction;
import org.cojen.tupl._Locker;
import org.cojen.tupl._RedoWriter;
import org.cojen.tupl._ReplRedoController;
import org.cojen.tupl._Tree;
import org.cojen.tupl.ext.ReplicationManager;
import org.cojen.tupl.ext.TransactionHandler;
import org.cojen.tupl.util.Latch;

final class _ReplRedoEngine
implements RedoVisitor {
    private static final long INFINITE_TIMEOUT = -1L;
    final ReplicationManager mManager;
    final _LocalDatabase mDatabase;
    final _ReplRedoController mController;
    private final LHashTable.Obj<SoftReference<Index>> mIndexes;
    private final Latch[] mLatches;
    private final int mLatchesMask;
    private final TxnTable mTransactions;
    private final int mMaxThreads;
    private final AtomicInteger mTotalThreads;
    private final AtomicInteger mIdleThreads;
    private final ConcurrentMap<DecodeTask, Object> mTaskThreadSet;
    private final Latch mDecodeLatch;
    private ReplRedoDecoder mDecoder;
    final Latch mOpLatch;
    long mDecodePosition;
    long mDecodeTransactionId;
    private static final long IDLE_TIMEOUT_NANOS = 5000000000L;
    private static int cTaskNumber;

    _ReplRedoEngine(ReplicationManager manager, int maxThreads, _LocalDatabase db, LHashTable.Obj<_LocalTransaction> txns) throws IOException {
        TxnTable txnTable;
        if (maxThreads <= 0) {
            int procs = Runtime.getRuntime().availableProcessors();
            int n = maxThreads = maxThreads == 0 ? procs : -maxThreads * procs;
            if (maxThreads <= 0) {
                maxThreads = Integer.MAX_VALUE;
            }
        }
        this.mManager = manager;
        this.mDatabase = db;
        this.mController = new _ReplRedoController(this);
        this.mIndexes = new LHashTable.Obj(16);
        this.mDecodeLatch = new Latch();
        this.mOpLatch = new Latch();
        this.mMaxThreads = maxThreads;
        this.mTotalThreads = new AtomicInteger();
        this.mIdleThreads = new AtomicInteger();
        this.mTaskThreadSet = new ConcurrentHashMap<DecodeTask, Object>(16, 0.75f, 1);
        int latchCount = Utils.roundUpPower2(maxThreads * 2);
        if (latchCount <= 0) {
            latchCount = 0x40000000;
        }
        this.mLatches = new Latch[latchCount];
        this.mLatchesMask = this.mLatches.length - 1;
        for (int i = 0; i < this.mLatches.length; ++i) {
            this.mLatches[i] = new Latch();
        }
        if (txns == null) {
            txnTable = new TxnTable(16);
        } else {
            txnTable = new TxnTable(txns.size());
            txns.traverse(entry -> {
                long scrambledTxnId = Utils.scramble(entry.key);
                Latch latch = this.selectLatch(scrambledTxnId);
                _LocalTransaction txn = (_LocalTransaction)entry.value;
                if (!txn.recoveryCleanup(false)) {
                    ((TxnEntry)txnTable.insert(scrambledTxnId)).init(txn, latch);
                }
                return true;
            });
        }
        this.mTransactions = txnTable;
        this.mDecodeLatch.acquireExclusive();
        this.mDecodePosition = manager.readPosition();
        this.mDecodeLatch.releaseExclusive();
    }

    public _RedoWriter initWriter(long redoNum) {
        this.mController.initCheckpointNumber(redoNum);
        return this.mController;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startReceiving(long initialPosition, long initialTxnId) {
        this.mDecodeLatch.acquireExclusive();
        if (this.mDecoder == null) {
            this.mOpLatch.acquireExclusive();
            try {
                try {
                    this.mDecoder = new ReplRedoDecoder(this.mManager, initialPosition, initialTxnId);
                }
                catch (Throwable e) {
                    this.mDecodeLatch.releaseExclusive();
                    throw e;
                }
                this.mDecodeTransactionId = initialTxnId;
                this.nextTask();
            }
            finally {
                this.mOpLatch.releaseExclusive();
            }
        }
        this.mDecodeLatch.releaseExclusive();
    }

    @Override
    public boolean reset() throws IOException {
        this.mOpLatch.acquireShared();
        this.mTransactions.traverse(entry -> {
            Latch latch = entry.latch();
            try {
                entry.mTxn.recoveryCleanup(true);
            }
            finally {
                latch.releaseExclusive();
            }
            return true;
        });
        this.mDatabase.emptyAllFragmentedTrash(false);
        this.opFinishedShared();
        return true;
    }

    @Override
    public boolean timestamp(long timestamp) {
        return this.nop();
    }

    @Override
    public boolean shutdown(long timestamp) {
        return this.nop();
    }

    @Override
    public boolean close(long timestamp) {
        return this.nop();
    }

    @Override
    public boolean endFile(long timestamp) {
        return this.nop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean store(long indexId, byte[] key, byte[] value) throws IOException {
        Index ix = this.getIndex(indexId);
        this.mOpLatch.acquireShared();
        _Locker locker = this.mDatabase.mLockManager.localLocker();
        locker.lockExclusive(indexId, key, -1L);
        this.nextTask();
        try {
            while (ix != null) {
                try {
                    ix.store(Transaction.BOGUS, key, value);
                    break;
                }
                catch (ClosedIndexException e) {
                    ix = this.openIndex(indexId, null);
                }
            }
        }
        finally {
            locker.scopeUnlockAll();
        }
        this.mOpLatch.releaseShared();
        this.notifyStore(ix, key, value);
        return false;
    }

    @Override
    public boolean storeNoLock(long indexId, byte[] key, byte[] value) throws IOException {
        return this.store(indexId, key, value);
    }

    @Override
    public boolean renameIndex(long txnId, long indexId, byte[] newName) throws IOException {
        byte[] oldName;
        Index ix;
        block6: {
            ix = this.getIndex(indexId);
            oldName = null;
            this.mOpLatch.acquireShared();
            if (ix != null) {
                oldName = ix.getName();
                try {
                    this.mDatabase.renameIndex(ix, newName, txnId);
                }
                catch (RuntimeException e) {
                    EventListener listener = this.mDatabase.mEventListener;
                    if (listener == null) break block6;
                    listener.notify(EventType.REPLICATION_WARNING, "Unable to rename index: %1$s", Utils.rootCause(e));
                    ix = null;
                }
            }
        }
        this.opFinishedShared();
        if (ix != null) {
            try {
                this.mManager.notifyRename(ix, oldName, (byte[])newName.clone());
            }
            catch (Throwable e) {
                Utils.uncaught(e);
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean deleteIndex(long txnId, long indexId) throws IOException {
        block13: {
            Runnable task;
            TxnEntry te = this.getTxnEntry(txnId);
            _LocalTransaction txn = te.mTxn;
            Index ix = this.getIndex(txn, indexId);
            this.mIndexes.remove(indexId);
            this.mOpLatch.acquireShared();
            Latch latch = te.latch();
            try {
                try {
                    txn.commit();
                }
                finally {
                    txn.exit();
                }
            }
            finally {
                latch.releaseExclusive();
            }
            this.opFinishedShared();
            if (ix != null) {
                ix.close();
                try {
                    this.mManager.notifyDrop(ix);
                }
                catch (Throwable e) {
                    Utils.uncaught(e);
                }
            }
            if ((task = this.mDatabase.replicaDeleteTree(indexId)) != null) {
                try {
                    Thread deletion = new Thread(task, "IndexDeletion-" + (ix == null ? Long.valueOf(indexId) : ix.getNameString()));
                    deletion.setDaemon(true);
                    deletion.start();
                }
                catch (Throwable e) {
                    EventListener listener = this.mDatabase.mEventListener;
                    if (listener == null) break block13;
                    listener.notify(EventType.REPLICATION_WARNING, "Unable to immediately delete index: %1$s", Utils.rootCause(e));
                }
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean txnEnter(long txnId) throws IOException {
        long scrambledTxnId = Utils.scramble(txnId);
        TxnEntry e = (TxnEntry)this.mTransactions.get(scrambledTxnId);
        this.mOpLatch.acquireShared();
        if (e == null) {
            _LocalTransaction txn = new _LocalTransaction(this.mDatabase, txnId, LockMode.UPGRADABLE_READ, -1L);
            ((TxnEntry)this.mTransactions.insert(scrambledTxnId)).init(txn, this.selectLatch(scrambledTxnId));
            this.opFinishedShared();
            return true;
        }
        Latch latch = e.latch();
        try {
            e.mTxn.enter();
        }
        finally {
            latch.releaseExclusive();
        }
        this.opFinishedShared();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean txnRollback(long txnId) throws IOException {
        TxnEntry te = this.getTxnEntry(txnId);
        this.mOpLatch.acquireShared();
        Latch latch = te.latch();
        try {
            this.nextTask();
            te.mTxn.exit();
        }
        finally {
            latch.releaseExclusive();
        }
        this.mOpLatch.releaseShared();
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean txnRollbackFinal(long txnId) throws IOException {
        this.mOpLatch.acquireShared();
        TxnEntry te = this.removeTxnEntry(txnId);
        if (te == null) {
            this.opFinishedShared();
            return true;
        }
        Latch latch = te.latch();
        try {
            this.nextTask();
            te.mTxn.reset();
        }
        finally {
            latch.releaseExclusive();
        }
        this.mOpLatch.releaseShared();
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean txnCommit(long txnId) throws IOException {
        TxnEntry te = this.getTxnEntry(txnId);
        this.mOpLatch.acquireShared();
        Latch latch = te.latch();
        try {
            _LocalTransaction txn = te.mTxn;
            try {
                txn.commit();
            }
            finally {
                txn.exit();
            }
        }
        finally {
            latch.releaseExclusive();
        }
        this.opFinishedShared();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean txnCommitFinal(long txnId) throws IOException {
        this.mOpLatch.acquireShared();
        TxnEntry te = this.removeTxnEntry(txnId);
        if (te == null) {
            throw new CorruptDatabaseException("Transaction not found: " + txnId);
        }
        Latch latch = te.latch();
        try {
            te.mTxn.commitAll();
        }
        finally {
            latch.releaseExclusive();
        }
        this.opFinishedShared();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean txnStore(long txnId, long indexId, byte[] key, byte[] value) throws IOException {
        Index ix = this.getIndex(indexId);
        TxnEntry te = this.getTxnEntry(txnId);
        this.mOpLatch.acquireShared();
        Latch latch = te.latch();
        try {
            _LocalTransaction txn = te.mTxn;
            txn.lockUpgradable(indexId, key, -1L);
            this.nextTask();
            while (ix != null) {
                try {
                    ix.store(txn, key, value);
                    break;
                }
                catch (ClosedIndexException e) {
                    ix = this.openIndex(indexId, null);
                }
            }
        }
        finally {
            latch.releaseExclusive();
        }
        this.mOpLatch.releaseShared();
        this.notifyStore(ix, key, value);
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean txnStoreCommitFinal(long txnId, long indexId, byte[] key, byte[] value) throws IOException {
        Index ix = this.getIndex(indexId);
        TxnEntry te = this.getTxnEntry(txnId);
        this.mOpLatch.acquireShared();
        Latch latch = te.latch();
        try {
            _LocalTransaction txn = te.mTxn;
            txn.lockUpgradable(indexId, key, -1L);
            this.nextTask();
            while (ix != null) {
                try {
                    ix.store(txn, key, value);
                    break;
                }
                catch (ClosedIndexException e) {
                    ix = this.openIndex(indexId, null);
                }
            }
            txn.commitAll();
        }
        finally {
            latch.releaseExclusive();
        }
        this.mOpLatch.releaseShared();
        this.notifyStore(ix, key, value);
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean txnCustom(long txnId, byte[] message) throws IOException {
        TransactionHandler handler = this.mDatabase.mCustomTxnHandler;
        if (handler == null) {
            throw new DatabaseException("Custom transaction handler is not installed");
        }
        TxnEntry te = this.getTxnEntry(txnId);
        this.mOpLatch.acquireExclusive();
        Latch latch = te.latch();
        try {
            handler.redo(this.mDatabase, te.mTxn, message);
        }
        finally {
            latch.releaseExclusive();
        }
        this.opFinishedExclusive();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean txnCustomLock(long txnId, byte[] message, long indexId, byte[] key) throws IOException {
        TransactionHandler handler = this.mDatabase.mCustomTxnHandler;
        if (handler == null) {
            throw new DatabaseException("Custom transaction handler is not installed");
        }
        TxnEntry te = this.getTxnEntry(txnId);
        this.mOpLatch.acquireShared();
        Latch latch = te.latch();
        try {
            _LocalTransaction txn = te.mTxn;
            txn.lockUpgradable(indexId, key, -1L);
            this.nextTask();
            txn.lockExclusive(indexId, key, -1L);
            handler.redo(this.mDatabase, txn, message, indexId, key);
        }
        finally {
            latch.releaseExclusive();
        }
        this.mOpLatch.releaseShared();
        return false;
    }

    private boolean nop() {
        this.mOpLatch.acquireShared();
        this.opFinishedShared();
        return true;
    }

    private void opFinishedShared() {
        this.doOpFinished();
        this.mOpLatch.releaseShared();
    }

    private void opFinishedExclusive() {
        this.doOpFinished();
        this.mOpLatch.releaseExclusive();
    }

    private void doOpFinished() {
        ReplRedoDecoder decoder = this.mDecoder;
        this.mDecodePosition = decoder.in().mPos;
        this.mDecodeTransactionId = decoder.mTxnId;
    }

    private void nextTask() {
        int total;
        ReplRedoDecoder decoder = this.mDecoder;
        this.mDecodePosition = decoder.in().mPos;
        this.mDecodeTransactionId = decoder.mTxnId;
        if (this.mIdleThreads.get() == 0 && (total = this.mTotalThreads.get()) < this.mMaxThreads && this.mTotalThreads.compareAndSet(total, total + 1)) {
            DecodeTask task;
            try {
                task = new DecodeTask();
                task.start();
            }
            catch (Throwable e) {
                this.mDecodeLatch.releaseExclusive();
                this.mTotalThreads.decrementAndGet();
                throw e;
            }
            this.mTaskThreadSet.put(task, this);
        }
        this.mDecodeLatch.releaseExclusive();
    }

    void suspend() {
        this.mOpLatch.acquireExclusive();
    }

    void resume() {
        this.mOpLatch.releaseExclusive();
    }

    private TxnEntry getTxnEntry(long txnId) throws IOException {
        long scrambledTxnId = Utils.scramble(txnId);
        TxnEntry e = (TxnEntry)this.mTransactions.get(scrambledTxnId);
        if (e == null) {
            _LocalTransaction txn = new _LocalTransaction(this.mDatabase, txnId, LockMode.UPGRADABLE_READ, -1L);
            e = (TxnEntry)this.mTransactions.insert(scrambledTxnId);
            e.init(txn, this.selectLatch(scrambledTxnId));
        }
        return e;
    }

    private TxnEntry removeTxnEntry(long txnId) throws IOException {
        long scrambledTxnId = Utils.scramble(txnId);
        return (TxnEntry)this.mTransactions.remove(scrambledTxnId);
    }

    private Index getIndex(Transaction txn, long indexId) throws IOException {
        Index ix;
        LHashTable.ObjEntry entry = (LHashTable.ObjEntry)this.mIndexes.get(indexId);
        if (entry != null && (ix = (Index)((SoftReference)entry.value).get()) != null) {
            return ix;
        }
        return this.openIndex(txn, indexId, entry);
    }

    private Index getIndex(long indexId) throws IOException {
        return this.getIndex(null, indexId);
    }

    private Index openIndex(Transaction txn, long indexId, LHashTable.ObjEntry<SoftReference<Index>> entry) throws IOException {
        Index ix = this.mDatabase.anyIndexById(txn, indexId);
        if (ix == null) {
            return null;
        }
        SoftReference<Index> ref = new SoftReference<Index>(ix);
        if (entry == null) {
            ((LHashTable.ObjEntry)this.mIndexes.insert((long)indexId)).value = ref;
        } else {
            entry.value = ref;
        }
        if (entry != null) {
            this.mIndexes.traverse(e -> ((SoftReference)e.value).get() == null);
        }
        return ix;
    }

    private Index openIndex(long indexId, LHashTable.ObjEntry<SoftReference<Index>> entry) throws IOException {
        return this.openIndex(null, indexId, entry);
    }

    private Latch selectLatch(long scrambledTxnId) {
        return this.mLatches[(int)scrambledTxnId & this.mLatchesMask];
    }

    boolean decode() {
        this.mIdleThreads.incrementAndGet();
        try {
            while (true) {
                block16: {
                    try {
                        if (!this.mDecodeLatch.tryAcquireExclusiveNanos(5000000000L)) break block16;
                        break;
                    }
                    catch (InterruptedException e) {
                        Thread.interrupted();
                    }
                }
                int total = this.mTotalThreads.get();
                if (total <= 1 || !this.mTotalThreads.compareAndSet(total, total - 1)) continue;
                boolean bl = false;
                return bl;
            }
        }
        finally {
            this.mIdleThreads.decrementAndGet();
        }
        ReplRedoDecoder decoder = this.mDecoder;
        if (decoder == null) {
            this.mTotalThreads.decrementAndGet();
            this.mDecodeLatch.releaseExclusive();
            return false;
        }
        try {
            if (!decoder.run(this)) {
                return true;
            }
            this.reset();
        }
        catch (Throwable e) {
            EventListener listener = this.mDatabase.mEventListener;
            if (listener != null) {
                listener.notify(EventType.REPLICATION_PANIC, "Unexpected replication exception: %1$s", Utils.rootCause(e));
            } else {
                Utils.uncaught(e);
            }
            this.mTotalThreads.decrementAndGet();
            this.mDecodeLatch.releaseExclusive();
            Utils.closeQuietly(null, this.mDatabase, e);
            return false;
        }
        this.mDecoder = null;
        this.mTotalThreads.decrementAndGet();
        this.mDecodeLatch.releaseExclusive();
        try {
            this.mController.leaderNotify();
        }
        catch (UnmodifiableReplicaException e) {
        }
        catch (Throwable e) {
            Utils.closeQuietly(null, this.mDatabase, e);
        }
        return false;
    }

    private void notifyStore(Index ix, byte[] key, byte[] value) {
        if (ix != null && !_Tree.isInternal(ix.getId())) {
            try {
                this.mManager.notifyStore(ix, key, value);
            }
            catch (Throwable e) {
                Utils.uncaught(e);
            }
        }
    }

    static synchronized long taskNumber() {
        return (long)(++cTaskNumber) & 0xFFFFFFFFL;
    }

    static final class TxnTable
    extends LHashTable<TxnEntry> {
        TxnTable(int capacity) {
            super(capacity);
        }

        @Override
        protected TxnEntry newEntry() {
            return new TxnEntry();
        }
    }

    static final class TxnEntry
    extends LHashTable.Entry<TxnEntry> {
        _LocalTransaction mTxn;
        Latch mLatch;

        TxnEntry() {
        }

        void init(_LocalTransaction txn, Latch latch) {
            this.mTxn = txn;
            this.mLatch = latch;
        }

        Latch latch() {
            Latch latch = this.mLatch;
            latch.acquireExclusive();
            return latch;
        }
    }

    final class DecodeTask
    extends Thread {
        DecodeTask() {
            super("ReplicationReceiver-" + _ReplRedoEngine.taskNumber());
            this.setDaemon(true);
        }

        @Override
        public void run() {
            while (_ReplRedoEngine.this.decode()) {
            }
            _ReplRedoEngine.this.mTaskThreadSet.remove(this);
        }
    }
}

