/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.function.RunnableEx;
import com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.function.Supplier;
import javax.annotation.Nonnull;

public class UnboundedTransactionsProcessorUtility<TXN_ID extends TwoPhaseSnapshotCommitUtility.TransactionId, RES extends TwoPhaseSnapshotCommitUtility.TransactionalResource<TXN_ID>>
extends TwoPhaseSnapshotCommitUtility<TXN_ID, RES> {
    private final Supplier<TXN_ID> createTxnIdFn;
    private final RunnableEx abortUnfinishedTransactionsAction;
    private TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource<TXN_ID, RES> activeTransaction;
    private final List<TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource<TXN_ID, RES>> pendingTransactions;
    private final Queue<TXN_ID> snapshotQueue = new ArrayDeque<TXN_ID>();
    private boolean initialized;
    private boolean snapshotInProgress;

    public UnboundedTransactionsProcessorUtility(@Nonnull Outbox outbox, @Nonnull Processor.Context procContext, @Nonnull ProcessingGuarantee externalGuarantee, @Nonnull Supplier<TXN_ID> createTxnIdFn, @Nonnull FunctionEx<TXN_ID, RES> createTxnFn, @Nonnull ConsumerEx<TXN_ID> recoverAndCommitFn, @Nonnull RunnableEx abortUnfinishedTransactionsAction) {
        super(outbox, procContext, false, externalGuarantee, createTxnFn, recoverAndCommitFn, (ConsumerEx<Integer>)(ConsumerEx & Serializable)txnId -> {
            throw new UnsupportedOperationException();
        });
        this.createTxnIdFn = createTxnIdFn;
        this.abortUnfinishedTransactionsAction = abortUnfinishedTransactionsAction;
        this.pendingTransactions = this.usesTransactionLifecycle() ? new ArrayList() : null;
    }

    @Override
    @Nonnull
    public RES activeTransaction() {
        if (this.activeTransaction == null) {
            if (!this.initialized) {
                if (this.usesTransactionLifecycle()) {
                    try {
                        this.procContext().logger().fine("aborting unfinished transactions");
                        this.abortUnfinishedTransactionsAction.run();
                    }
                    catch (Exception e) {
                        throw ExceptionUtil.sneakyThrow(e);
                    }
                }
                this.initialized = true;
            }
            this.activeTransaction = (TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource)this.createTxnFn().apply(this.createTxnIdFn.get());
            if (this.usesTransactionLifecycle()) {
                this.activeTransaction.begin();
            }
        }
        return this.activeTransaction.wrapped();
    }

    public void finishActiveTransaction() {
        if (this.activeTransaction == null) {
            return;
        }
        if (this.usesTransactionLifecycle()) {
            this.pendingTransactions.add(this.activeTransaction);
            this.activeTransaction.endAndPrepare();
        } else {
            this.activeTransaction.release();
        }
        this.activeTransaction = null;
    }

    @Override
    public void afterCompleted() {
        if (this.activeTransaction == null) {
            return;
        }
        if (this.usesTransactionLifecycle()) {
            this.pendingTransactions.add(this.activeTransaction);
            if (!this.snapshotInProgress) {
                this.commitPendingTransactions();
            }
        } else {
            this.activeTransaction.release();
        }
        this.activeTransaction = null;
    }

    @Override
    public boolean snapshotCommitPrepare() {
        TwoPhaseSnapshotCommitUtility.TransactionId txnId;
        if (this.usesTransactionLifecycle()) {
            if (this.snapshotQueue.isEmpty()) {
                this.finishActiveTransaction();
                for (TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource<TXN_ID, RES> txn : this.pendingTransactions) {
                    this.snapshotQueue.add(txn.id());
                }
            }
        } else if (this.activeTransaction != null) {
            this.activeTransaction.flush();
        }
        while ((txnId = (TwoPhaseSnapshotCommitUtility.TransactionId)this.snapshotQueue.peek()) != null) {
            if (!this.getOutbox().offerToSnapshot(BroadcastKey.broadcastKey(txnId), false)) {
                return false;
            }
            this.snapshotQueue.remove();
        }
        this.snapshotInProgress = true;
        return true;
    }

    @Override
    public boolean snapshotCommitFinish(boolean success) {
        assert (this.snapshotInProgress) : "no snapshot in progress";
        this.snapshotInProgress = false;
        if (this.usesTransactionLifecycle() && success) {
            this.commitPendingTransactions();
        }
        return true;
    }

    private void commitPendingTransactions() {
        for (TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource<TXN_ID, RES> txn : this.pendingTransactions) {
            txn.commit();
            txn.release();
        }
        this.pendingTransactions.clear();
    }

    @Override
    public void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        TwoPhaseSnapshotCommitUtility.TransactionId txnId = (TwoPhaseSnapshotCommitUtility.TransactionId)((BroadcastKey)key).key();
        if (txnId.index() % this.procContext().totalParallelism() == this.procContext().globalProcessorIndex()) {
            this.recoverAndCommitFn().accept(txnId);
        }
    }

    @Override
    public void close() {
        if (this.activeTransaction != null) {
            this.activeTransaction.rollback();
            this.activeTransaction.release();
            this.activeTransaction = null;
        }
        if (this.pendingTransactions != null) {
            for (TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource<TXN_ID, RES> txn : this.pendingTransactions) {
                txn.release();
            }
            this.pendingTransactions.clear();
        }
    }
}

