/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.coordinator.transaction.completion;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.Coordinator;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionMetadataManager;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionDomain;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMarker;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionOffset;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionPrepare;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionState;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.UnCompletedTransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.log.TransactionLogSegment;
import org.joyqueue.broker.kafka.coordinator.transaction.synchronizer.TransactionSynchronizer;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionSegmentCompletionHandler {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionSegmentCompletionHandler.class);
    private KafkaConfig config;
    private Coordinator coordinator;
    private TransactionMetadataManager transactionMetadataManager;
    private TransactionLogSegment transactionLogSegment;
    private TransactionSynchronizer transactionSynchronizer;
    private final Map<String, UnCompletedTransactionMetadata> unCompletedTransactionMap = Maps.newLinkedHashMap();
    private final Map<Long, UnCompletedTransactionMetadata> unCompletedTransactionSortedMap = Maps.newLinkedHashMap();
    private long lastTime = 0L;
    private long startIndex = 0L;
    private long currentIndex = 0L;
    private long committedIndex = 0L;

    public TransactionSegmentCompletionHandler(KafkaConfig config, Coordinator coordinator, TransactionMetadataManager transactionMetadataManager, TransactionLogSegment transactionLogSegment, TransactionSynchronizer transactionSynchronizer) {
        this.config = config;
        this.coordinator = coordinator;
        this.transactionMetadataManager = transactionMetadataManager;
        this.transactionLogSegment = transactionLogSegment;
        this.transactionSynchronizer = transactionSynchronizer;
        this.startIndex = this.currentIndex = transactionLogSegment.getIndex();
        this.committedIndex = this.currentIndex;
    }

    public void handle() {
        try {
            this.handleUnCompleteTransactions();
            this.commitUnCompleteTransactionIndex();
        }
        catch (Exception e) {
            logger.error("transaction compensate exception", (Throwable)e);
        }
    }

    protected List<TransactionDomain> readTransactions() throws Exception {
        List<TransactionDomain> transactionDomains = this.transactionLogSegment.read(this.currentIndex, this.config.getTransactionLogScanSize());
        if (CollectionUtils.isEmpty(transactionDomains)) {
            return Collections.emptyList();
        }
        return transactionDomains;
    }

    protected List<TransactionDomain> prepareTransactionDomains(List<TransactionDomain> transactionDomains) {
        if (CollectionUtils.isEmpty(transactionDomains)) {
            this.lastTime = SystemClock.now();
            return Collections.emptyList();
        }
        LinkedList result = Lists.newLinkedList();
        for (int i = 0; i < transactionDomains.size(); ++i) {
            long currentIndex = this.currentIndex + (long)i;
            TransactionDomain transactionDomain = transactionDomains.get(i);
            UnCompletedTransactionMetadata unCompletedTransaction = null;
            if (transactionDomain instanceof TransactionPrepare) {
                unCompletedTransaction = this.handlePrepare((TransactionPrepare)transactionDomain, currentIndex);
            } else if (transactionDomain instanceof TransactionOffset) {
                unCompletedTransaction = this.handleOffset((TransactionOffset)transactionDomain, currentIndex);
            } else if (transactionDomain instanceof TransactionMarker) {
                unCompletedTransaction = this.handleMarker((TransactionMarker)transactionDomain, currentIndex);
            } else {
                logger.warn("unsupported transaction domain, type: {}", (Object)transactionDomain);
            }
            if (unCompletedTransaction == null) {
                result.add(null);
                continue;
            }
            this.lastTime = unCompletedTransaction.getCreateTime();
            this.unCompletedTransactionSortedMap.put(currentIndex, unCompletedTransaction);
            result.add(transactionDomain);
        }
        return result;
    }

    protected UnCompletedTransactionMetadata handlePrepare(TransactionPrepare transactionPrepare, long index) {
        String key = this.generateKey(transactionPrepare.getApp(), transactionPrepare.getTransactionId(), transactionPrepare.getProducerId(), transactionPrepare.getProducerEpoch(), transactionPrepare.getEpoch());
        UnCompletedTransactionMetadata unCompletedTransaction = this.unCompletedTransactionMap.get(key);
        if (unCompletedTransaction == null) {
            unCompletedTransaction = new UnCompletedTransactionMetadata();
            unCompletedTransaction.setId(transactionPrepare.getTransactionId());
            unCompletedTransaction.setStartIndex(index);
            unCompletedTransaction.setApp(transactionPrepare.getApp());
            unCompletedTransaction.setProducerId(transactionPrepare.getProducerId());
            unCompletedTransaction.setProducerEpoch(transactionPrepare.getProducerEpoch());
            unCompletedTransaction.setEpoch(transactionPrepare.getEpoch());
            unCompletedTransaction.setTimeout(transactionPrepare.getTimeout());
            unCompletedTransaction.setCreateTime(transactionPrepare.getCreateTime());
            this.unCompletedTransactionMap.put(key, unCompletedTransaction);
        }
        unCompletedTransaction.setLastTime(transactionPrepare.getCreateTime());
        unCompletedTransaction.setEndIndex(index);
        unCompletedTransaction.addPrepare(transactionPrepare);
        unCompletedTransaction.setState(TransactionState.ONGOING);
        return unCompletedTransaction;
    }

    protected UnCompletedTransactionMetadata handleOffset(TransactionOffset transactionOffset, long index) {
        String key = this.generateKey(transactionOffset.getApp(), transactionOffset.getTransactionId(), transactionOffset.getProducerId(), transactionOffset.getProducerEpoch(), transactionOffset.getEpoch());
        UnCompletedTransactionMetadata unCompletedTransaction = this.unCompletedTransactionMap.get(key);
        if (unCompletedTransaction == null) {
            return null;
        }
        unCompletedTransaction.setEndIndex(index);
        unCompletedTransaction.addOffset(transactionOffset);
        unCompletedTransaction.setLastTime(transactionOffset.getCreateTime());
        return unCompletedTransaction;
    }

    protected UnCompletedTransactionMetadata handleMarker(TransactionMarker transactionMarker, long index) {
        String key = this.generateKey(transactionMarker.getApp(), transactionMarker.getTransactionId(), transactionMarker.getProducerId(), transactionMarker.getProducerEpoch(), transactionMarker.getEpoch());
        UnCompletedTransactionMetadata unCompletedTransaction = this.unCompletedTransactionMap.get(key);
        if (unCompletedTransaction == null || unCompletedTransaction.getState().equals((Object)transactionMarker.getState())) {
            return null;
        }
        unCompletedTransaction.setEndIndex(index);
        unCompletedTransaction.setState(transactionMarker.getState());
        unCompletedTransaction.setLastTime(transactionMarker.getCreateTime());
        return unCompletedTransaction;
    }

    protected void handleUnCompleteTransactions() throws Exception {
        List<TransactionDomain> transactionDomains = this.readTransactions();
        this.prepareTransactionDomains(transactionDomains);
        this.handleUnCompleteTransactions(transactionDomains);
    }

    protected void handleUnCompleteTransactions(List<TransactionDomain> transactionDomains) {
        for (Map.Entry<Long, UnCompletedTransactionMetadata> entry : this.unCompletedTransactionSortedMap.entrySet()) {
            long currentIndex = entry.getKey();
            UnCompletedTransactionMetadata unCompletedTransaction = entry.getValue();
            if (unCompletedTransaction != null) {
                logger.debug("read transaction, currentIndex: {}, isCompleted: {}, epoch: {}, state: {}, metadata: {}", new Object[]{currentIndex, unCompletedTransaction.isCompleted(), unCompletedTransaction.getProducerEpoch(), unCompletedTransaction.getState(), unCompletedTransaction});
            }
            if (unCompletedTransaction == null || unCompletedTransaction.isCompleted() || !unCompletedTransaction.isExpired(this.lastTime, unCompletedTransaction.getTimeout())) continue;
            this.handleTimeoutTransaction(unCompletedTransaction);
        }
        if (CollectionUtils.isNotEmpty(transactionDomains)) {
            this.currentIndex += (long)transactionDomains.size();
            logger.debug("left transaction index: {}", (Object)this.currentIndex);
        }
    }

    protected void commitUnCompleteTransactionIndex() {
        long commitIndex = -1L;
        for (Map.Entry<Long, UnCompletedTransactionMetadata> entry : this.unCompletedTransactionSortedMap.entrySet()) {
            long currentIndex = entry.getKey();
            UnCompletedTransactionMetadata unCompletedTransaction = entry.getValue();
            if (unCompletedTransaction == null) continue;
            if (!unCompletedTransaction.isCompleted()) break;
            if (currentIndex != unCompletedTransaction.getEndIndex()) continue;
            boolean isCommit = true;
            for (long j = Math.max((long)((int)commitIndex), this.committedIndex); j < unCompletedTransaction.getEndIndex(); ++j) {
                UnCompletedTransactionMetadata checkUnCompletedTransaction = this.unCompletedTransactionSortedMap.get(j);
                if (checkUnCompletedTransaction == null || checkUnCompletedTransaction.isCompleted()) continue;
                isCommit = false;
                break;
            }
            if (!isCommit) break;
            commitIndex = currentIndex + 1L;
        }
        if (commitIndex < 0L) {
            return;
        }
        try {
            logger.info("commit transaction index {}", (Object)commitIndex);
            this.transactionLogSegment.saveIndex(commitIndex);
        }
        catch (Exception e) {
            logger.error("commit transaction index exception", (Throwable)e);
        }
        logger.debug("remove transaction cache, commitIndex: {}, committedIndex: {}", (Object)commitIndex, (Object)this.committedIndex);
        for (long i = this.committedIndex; i < commitIndex; ++i) {
            logger.debug("remove transaction cache {}", (Object)i);
            UnCompletedTransactionMetadata unCompletedTransaction = this.unCompletedTransactionSortedMap.remove(i);
            if (unCompletedTransaction == null) continue;
            logger.debug("remove transaction cache, txId: {}", (Object)this.generateKey(unCompletedTransaction.getApp(), unCompletedTransaction.getId(), unCompletedTransaction.getProducerId(), unCompletedTransaction.getProducerEpoch(), unCompletedTransaction.getEpoch()));
            this.unCompletedTransactionMap.remove(this.generateKey(unCompletedTransaction.getApp(), unCompletedTransaction.getId(), unCompletedTransaction.getProducerId(), unCompletedTransaction.getProducerEpoch(), unCompletedTransaction.getEpoch()));
        }
        logger.info("remove transaction cache complete, sortedMapSize: {}, mapSize: {}", (Object)this.unCompletedTransactionSortedMap.size(), (Object)this.unCompletedTransactionMap.size());
        this.committedIndex = commitIndex;
    }

    protected void handleRetryTransaction(UnCompletedTransactionMetadata unCompletedTransaction) {
        logger.debug("retry transaction, txId: {}, metadata: {}", (Object)unCompletedTransaction.getId(), (Object)unCompletedTransaction);
        if (unCompletedTransaction.getState().equals((Object)TransactionState.PREPARE_COMMIT)) {
            if (this.tryCommit(unCompletedTransaction)) {
                unCompletedTransaction.transitionStateTo(TransactionState.COMPLETE_COMMIT);
            } else {
                unCompletedTransaction.incrReties();
            }
        } else if (unCompletedTransaction.getState().equals((Object)TransactionState.PREPARE_ABORT)) {
            if (this.tryAbort(unCompletedTransaction)) {
                unCompletedTransaction.transitionStateTo(TransactionState.COMPLETE_ABORT);
            } else {
                unCompletedTransaction.incrReties();
            }
        }
        if (unCompletedTransaction.isCompleted()) {
            logger.info("retry transaction success, metadata: {}", (Object)unCompletedTransaction);
        } else if (unCompletedTransaction.getReties() >= this.config.getTransactionLogRetries()) {
            logger.warn("retry transaction failed, metadata: {}", (Object)unCompletedTransaction);
            unCompletedTransaction.transitionStateTo(TransactionState.DEAD);
        }
        this.syncTransactionState(unCompletedTransaction);
    }

    protected void handleTimeoutTransaction(UnCompletedTransactionMetadata unCompletedTransaction) {
        logger.warn("transaction timeout, txId: {}, metadata: {}", (Object)unCompletedTransaction.getId(), (Object)unCompletedTransaction);
        this.tryAbort(unCompletedTransaction);
        unCompletedTransaction.transitionStateTo(TransactionState.DEAD);
        this.syncTransactionState(unCompletedTransaction);
    }

    protected void syncTransactionState(UnCompletedTransactionMetadata unCompletedTransaction) {
        TransactionMetadata transaction = this.transactionMetadataManager.tryGetTransaction(unCompletedTransaction.getId());
        if (transaction == null) {
            return;
        }
        transaction.setState(unCompletedTransaction.getState());
    }

    protected boolean tryAbort(UnCompletedTransactionMetadata unCompletedTransaction) {
        try {
            return this.transactionSynchronizer.tryAbort(unCompletedTransaction, unCompletedTransaction.getPrepare());
        }
        catch (Exception e) {
            logger.error("tryAbort transaction exception, metadata: {}", (Object)unCompletedTransaction);
            return true;
        }
    }

    protected boolean tryCommit(UnCompletedTransactionMetadata unCompletedTransaction) {
        try {
            return this.transactionSynchronizer.tryCommit(unCompletedTransaction, unCompletedTransaction.getPrepare(), unCompletedTransaction.getOffsets());
        }
        catch (Exception e) {
            logger.error("tryCommit transaction exception, metadata: {}", (Object)unCompletedTransaction);
            return false;
        }
    }

    protected String generateKey(String app, String transactionId, long producerId, short producerEpoch, short epoch) {
        return String.format("%s_%s_%s_%s_%s", app, transactionId, producerId, producerEpoch, epoch);
    }
}

