/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class TransactionPoolSnapshotUtility<TXN_ID extends TwoPhaseSnapshotCommitUtility.TransactionId, RES extends TwoPhaseSnapshotCommitUtility.TransactionalResource<TXN_ID>>
extends TwoPhaseSnapshotCommitUtility<TXN_ID, RES> {
    private static final int TXN_PROBING_FACTOR = 5;
    private final int poolSize;
    private final List<TXN_ID> transactionIds;
    private List<TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource<TXN_ID, RES>> transactions;
    private int activeTxnIndex;
    private boolean activeTransactionUsed;
    private TXN_ID preparedTxnId;
    private TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource<TXN_ID, RES> transactionToCommit;
    private boolean flushed;
    private boolean processorCompleted;
    private boolean transactionsReleased;

    public TransactionPoolSnapshotUtility(@Nonnull Outbox outbox, @Nonnull Processor.Context procContext, boolean isSource, @Nonnull ProcessingGuarantee externalGuarantee, int poolSize, @Nonnull BiFunctionEx<Integer, Integer, TXN_ID> createTxnIdFn, @Nonnull FunctionEx<TXN_ID, RES> createTxnFn, @Nonnull ConsumerEx<TXN_ID> recoverAndCommitFn, @Nonnull ConsumerEx<TXN_ID> recoverAndAbortFn) {
        super(outbox, procContext, isSource, externalGuarantee, createTxnFn, recoverAndCommitFn, (ConsumerEx<Integer>)(ConsumerEx & Serializable)processorIndex -> {
            for (int i = 0; i < TransactionPoolSnapshotUtility.adjustPoolSize(externalGuarantee, isSource, poolSize); ++i) {
                TwoPhaseSnapshotCommitUtility.TransactionId txnId = (TwoPhaseSnapshotCommitUtility.TransactionId)createTxnIdFn.apply(processorIndex, (Object)i);
                LoggingUtil.logFine(procContext.logger(), "recover and abort %s", txnId);
                recoverAndAbortFn.accept((Object)txnId);
            }
        });
        this.poolSize = TransactionPoolSnapshotUtility.adjustPoolSize(externalGuarantee, isSource, poolSize);
        LoggingUtil.logFine(procContext.logger(), "Actual pool size used: %d", this.poolSize);
        if (this.poolSize > 1) {
            this.transactionIds = new ArrayList<TXN_ID>(this.poolSize);
            for (int i = 0; i < this.poolSize; ++i) {
                this.transactionIds.add(createTxnIdFn.apply((Object)this.procContext().globalProcessorIndex(), (Object)i));
                assert (i == 0 || !((TwoPhaseSnapshotCommitUtility.TransactionId)this.transactionIds.get(i)).equals(this.transactionIds.get(i - 1))) : "two equal IDs generated";
            }
        } else {
            this.transactionIds = Collections.singletonList(null);
        }
    }

    private static int adjustPoolSize(@Nonnull ProcessingGuarantee externalGuarantee, boolean isSource, int poolSize) {
        if (externalGuarantee == ProcessingGuarantee.EXACTLY_ONCE && poolSize < 2 || poolSize < 1 || poolSize > 3) {
            throw new IllegalArgumentException("poolSize=" + poolSize);
        }
        if (externalGuarantee == ProcessingGuarantee.AT_LEAST_ONCE && isSource) {
            poolSize = Math.min(2, poolSize);
        }
        if (externalGuarantee == ProcessingGuarantee.NONE || externalGuarantee == ProcessingGuarantee.AT_LEAST_ONCE && !isSource) {
            poolSize = 1;
        }
        return poolSize;
    }

    @Override
    @Nullable
    public RES activeTransaction() {
        this.ensureTransactions();
        if (this.usesTransactionLifecycle() && this.poolSize < (this.preparedTxnId != null ? 3 : 2)) {
            return null;
        }
        TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource<TXN_ID, RES> activeTransaction = this.transactions.get(this.activeTxnIndex);
        if (!this.activeTransactionUsed && this.usesTransactionLifecycle()) {
            activeTransaction.begin();
        }
        this.activeTransactionUsed = true;
        return activeTransaction.wrapped();
    }

    private void rollbackOtherTransactions() {
        if (!this.usesTransactionLifecycle()) {
            return;
        }
        for (int index = this.procContext().totalParallelism() + this.procContext().globalProcessorIndex(); index < this.procContext().totalParallelism() * 5; index += this.procContext().totalParallelism()) {
            this.recoverAndAbortFn().accept((Object)index);
        }
    }

    @Override
    public boolean snapshotCommitPrepare() {
        if (this.externalGuarantee() == ProcessingGuarantee.NONE) {
            return true;
        }
        this.ensureTransactions();
        assert (this.preparedTxnId == null) : "preparedTxnId != null";
        this.transactionToCommit = this.transactions.get(this.activeTxnIndex);
        this.incrementActiveTxnIndex();
        if (!this.activeTransactionUsed) {
            LoggingUtil.logFine(this.procContext().logger(), "transaction not used, ignoring snapshot, txnId=%s", this.transactionToCommit.id());
            return true;
        }
        this.activeTransactionUsed = false;
        if (!this.flushed && !(this.flushed = this.transactionToCommit.flush())) {
            this.procContext().logger().fine("flush returned false");
            return false;
        }
        if (this.usesTransactionLifecycle()) {
            this.preparedTxnId = (TwoPhaseSnapshotCommitUtility.TransactionId)this.transactionToCommit.id();
            if (!this.getOutbox().offerToSnapshot(BroadcastKey.broadcastKey(this.preparedTxnId), false)) {
                return false;
            }
            this.transactionToCommit.endAndPrepare();
        }
        this.flushed = false;
        return true;
    }

    @Override
    public boolean snapshotCommitFinish(boolean success) {
        if (!this.usesTransactionLifecycle() || this.preparedTxnId == null) {
            return true;
        }
        this.preparedTxnId = null;
        if (!success) {
            throw new RetryableHazelcastException("the snapshot failed");
        }
        this.transactionToCommit.commit();
        if (this.processorCompleted) {
            this.doRelease();
        }
        return true;
    }

    @Override
    public void afterCompleted() {
        this.processorCompleted = true;
        if (this.preparedTxnId == null) {
            this.doRelease();
        }
    }

    @Override
    public void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        assert (!this.activeTransactionUsed) : "transaction already begun";
        TwoPhaseSnapshotCommitUtility.TransactionId txnId = (TwoPhaseSnapshotCommitUtility.TransactionId)((BroadcastKey)key).key();
        if (this.usesTransactionLifecycle() && txnId.index() % this.procContext().totalParallelism() == this.procContext().globalProcessorIndex()) {
            for (int i = 0; i < this.poolSize; ++i) {
                if (!txnId.equals(this.transactionIds.get(i))) continue;
                this.activeTxnIndex = i;
                this.incrementActiveTxnIndex();
                break;
            }
            LoggingUtil.logFine(this.procContext().logger(), "recover and commit %s", txnId);
            this.recoverAndCommitFn().accept(txnId);
        }
    }

    @Override
    public void close() {
        this.doRelease();
    }

    private void ensureTransactions() {
        if (this.transactions == null) {
            if (this.transactionsReleased) {
                throw new IllegalStateException("transactions already released");
            }
            this.rollbackOtherTransactions();
            this.transactions = Util.toList(this.transactionIds, this.createTxnFn());
        }
    }

    private void incrementActiveTxnIndex() {
        ++this.activeTxnIndex;
        if (this.activeTxnIndex == this.poolSize) {
            this.activeTxnIndex = 0;
        }
    }

    private void doRelease() {
        if (this.transactionsReleased) {
            return;
        }
        this.transactionsReleased = true;
        if (this.transactions != null) {
            if (this.usesTransactionLifecycle() && this.activeTransactionUsed) {
                this.transactions.get(this.activeTxnIndex).rollback();
            }
            for (TwoPhaseSnapshotCommitUtility.LoggingNonThrowingResource<TXN_ID, RES> txn : this.transactions) {
                txn.release();
            }
        }
    }
}

