package org.joyqueue.broker.kafka.coordinator.transaction.completion;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.Coordinator;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionMetadataManager;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionDomain;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMarker;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionOffset;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionPrepare;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionState;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.UnCompletedTransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.log.TransactionLogSegment;
import org.joyqueue.broker.kafka.coordinator.transaction.synchronizer.TransactionSynchronizer;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/transaction/completion/TransactionSegmentCompletionHandler.class */
public class TransactionSegmentCompletionHandler {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionSegmentCompletionHandler.class);
    private KafkaConfig config;
    private Coordinator coordinator;
    private TransactionMetadataManager transactionMetadataManager;
    private TransactionLogSegment transactionLogSegment;
    private TransactionSynchronizer transactionSynchronizer;
    private final Map<String, UnCompletedTransactionMetadata> unCompletedTransactionMap = Maps.newLinkedHashMap();
    private final Map<Long, UnCompletedTransactionMetadata> unCompletedTransactionSortedMap = Maps.newLinkedHashMap();
    private long lastTime = 0;
    private long startIndex;
    private long currentIndex;
    private long committedIndex;

    public TransactionSegmentCompletionHandler(KafkaConfig kafkaConfig, Coordinator coordinator, TransactionMetadataManager transactionMetadataManager, TransactionLogSegment transactionLogSegment, TransactionSynchronizer transactionSynchronizer) {
        this.startIndex = 0L;
        this.currentIndex = 0L;
        this.committedIndex = 0L;
        this.config = kafkaConfig;
        this.coordinator = coordinator;
        this.transactionMetadataManager = transactionMetadataManager;
        this.transactionLogSegment = transactionLogSegment;
        this.transactionSynchronizer = transactionSynchronizer;
        this.currentIndex = transactionLogSegment.getIndex();
        this.startIndex = this.currentIndex;
        this.committedIndex = this.currentIndex;
    }

    public void handle() {
        try {
            handleUnCompleteTransactions();
            commitUnCompleteTransactionIndex();
        } catch (Exception e) {
            logger.error("transaction compensate exception", e);
        }
    }

    protected List<TransactionDomain> readTransactions() throws Exception {
        List<TransactionDomain> read = this.transactionLogSegment.read(this.currentIndex, this.config.getTransactionLogScanSize());
        return CollectionUtils.isEmpty(read) ? Collections.emptyList() : read;
    }

    protected List<TransactionDomain> prepareTransactionDomains(List<TransactionDomain> list) {
        if (CollectionUtils.isEmpty(list)) {
            this.lastTime = SystemClock.now();
            return Collections.emptyList();
        }
        LinkedList newLinkedList = Lists.newLinkedList();
        for (int i = 0; i < list.size(); i++) {
            long j = this.currentIndex + i;
            TransactionDomain transactionDomain = list.get(i);
            UnCompletedTransactionMetadata unCompletedTransactionMetadata = null;
            if (transactionDomain instanceof TransactionPrepare) {
                unCompletedTransactionMetadata = handlePrepare((TransactionPrepare) transactionDomain, j);
            } else if (transactionDomain instanceof TransactionOffset) {
                unCompletedTransactionMetadata = handleOffset((TransactionOffset) transactionDomain, j);
            } else if (transactionDomain instanceof TransactionMarker) {
                unCompletedTransactionMetadata = handleMarker((TransactionMarker) transactionDomain, j);
            } else {
                logger.warn("unsupported transaction domain, type: {}", transactionDomain);
            }
            if (unCompletedTransactionMetadata == null) {
                newLinkedList.add(null);
            } else {
                this.lastTime = unCompletedTransactionMetadata.getCreateTime();
                this.unCompletedTransactionSortedMap.put(Long.valueOf(j), unCompletedTransactionMetadata);
                newLinkedList.add(transactionDomain);
            }
        }
        return newLinkedList;
    }

    protected UnCompletedTransactionMetadata handlePrepare(TransactionPrepare transactionPrepare, long j) {
        String generateKey = generateKey(transactionPrepare.getApp(), transactionPrepare.getTransactionId(), transactionPrepare.getProducerId(), transactionPrepare.getProducerEpoch(), transactionPrepare.getEpoch());
        UnCompletedTransactionMetadata unCompletedTransactionMetadata = this.unCompletedTransactionMap.get(generateKey);
        if (unCompletedTransactionMetadata == null) {
            unCompletedTransactionMetadata = new UnCompletedTransactionMetadata();
            unCompletedTransactionMetadata.setId(transactionPrepare.getTransactionId());
            unCompletedTransactionMetadata.setStartIndex(j);
            unCompletedTransactionMetadata.setApp(transactionPrepare.getApp());
            unCompletedTransactionMetadata.setProducerId(transactionPrepare.getProducerId());
            unCompletedTransactionMetadata.setProducerEpoch(transactionPrepare.getProducerEpoch());
            unCompletedTransactionMetadata.setEpoch(transactionPrepare.getEpoch());
            unCompletedTransactionMetadata.setTimeout(transactionPrepare.getTimeout());
            unCompletedTransactionMetadata.setCreateTime(transactionPrepare.getCreateTime());
            this.unCompletedTransactionMap.put(generateKey, unCompletedTransactionMetadata);
        }
        unCompletedTransactionMetadata.setLastTime(transactionPrepare.getCreateTime());
        unCompletedTransactionMetadata.setEndIndex(j);
        unCompletedTransactionMetadata.addPrepare(transactionPrepare);
        unCompletedTransactionMetadata.setState(TransactionState.ONGOING);
        return unCompletedTransactionMetadata;
    }

    protected UnCompletedTransactionMetadata handleOffset(TransactionOffset transactionOffset, long j) {
        UnCompletedTransactionMetadata unCompletedTransactionMetadata = this.unCompletedTransactionMap.get(generateKey(transactionOffset.getApp(), transactionOffset.getTransactionId(), transactionOffset.getProducerId(), transactionOffset.getProducerEpoch(), transactionOffset.getEpoch()));
        if (unCompletedTransactionMetadata == null) {
            return null;
        }
        unCompletedTransactionMetadata.setEndIndex(j);
        unCompletedTransactionMetadata.addOffset(transactionOffset);
        unCompletedTransactionMetadata.setLastTime(transactionOffset.getCreateTime());
        return unCompletedTransactionMetadata;
    }

    protected UnCompletedTransactionMetadata handleMarker(TransactionMarker transactionMarker, long j) {
        UnCompletedTransactionMetadata unCompletedTransactionMetadata = this.unCompletedTransactionMap.get(generateKey(transactionMarker.getApp(), transactionMarker.getTransactionId(), transactionMarker.getProducerId(), transactionMarker.getProducerEpoch(), transactionMarker.getEpoch()));
        if (unCompletedTransactionMetadata == null || unCompletedTransactionMetadata.getState().equals(transactionMarker.getState())) {
            return null;
        }
        unCompletedTransactionMetadata.setEndIndex(j);
        unCompletedTransactionMetadata.setState(transactionMarker.getState());
        unCompletedTransactionMetadata.setLastTime(transactionMarker.getCreateTime());
        return unCompletedTransactionMetadata;
    }

    protected void handleUnCompleteTransactions() throws Exception {
        List<TransactionDomain> readTransactions = readTransactions();
        prepareTransactionDomains(readTransactions);
        handleUnCompleteTransactions(readTransactions);
    }

    protected void handleUnCompleteTransactions(List<TransactionDomain> list) {
        for (Map.Entry<Long, UnCompletedTransactionMetadata> entry : this.unCompletedTransactionSortedMap.entrySet()) {
            long longValue = entry.getKey().longValue();
            UnCompletedTransactionMetadata value = entry.getValue();
            if (value != null) {
                logger.debug("read transaction, currentIndex: {}, isCompleted: {}, epoch: {}, state: {}, metadata: {}", new Object[]{Long.valueOf(longValue), Boolean.valueOf(value.isCompleted()), Short.valueOf(value.getProducerEpoch()), value.getState(), value});
            }
            if (value != null && !value.isCompleted() && value.isExpired(this.lastTime, value.getTimeout())) {
                handleTimeoutTransaction(value);
            }
        }
        if (CollectionUtils.isNotEmpty(list)) {
            this.currentIndex += list.size();
            logger.debug("left transaction index: {}", Long.valueOf(this.currentIndex));
        }
    }

    protected void commitUnCompleteTransactionIndex() {
        long j = -1;
        for (Map.Entry<Long, UnCompletedTransactionMetadata> entry : this.unCompletedTransactionSortedMap.entrySet()) {
            long longValue = entry.getKey().longValue();
            UnCompletedTransactionMetadata value = entry.getValue();
            if (value != null) {
                if (!value.isCompleted()) {
                    break;
                }
                if (longValue == value.getEndIndex()) {
                    boolean z = true;
                    long max = Math.max((int) j, this.committedIndex);
                    while (true) {
                        long j2 = max;
                        if (j2 >= value.getEndIndex()) {
                            break;
                        }
                        UnCompletedTransactionMetadata unCompletedTransactionMetadata = this.unCompletedTransactionSortedMap.get(Long.valueOf(j2));
                        if (unCompletedTransactionMetadata != null && !unCompletedTransactionMetadata.isCompleted()) {
                            z = false;
                            break;
                        }
                        max = j2 + 1;
                    }
                    if (!z) {
                        break;
                    } else {
                        j = longValue + 1;
                    }
                } else {
                    continue;
                }
            }
        }
        if (j < 0) {
            return;
        }
        try {
            logger.info("commit transaction index {}", Long.valueOf(j));
            this.transactionLogSegment.saveIndex(j);
        } catch (Exception e) {
            logger.error("commit transaction index exception", e);
        }
        logger.debug("remove transaction cache, commitIndex: {}, committedIndex: {}", Long.valueOf(j), Long.valueOf(this.committedIndex));
        long j3 = this.committedIndex;
        while (true) {
            long j4 = j3;
            if (j4 >= j) {
                logger.info("remove transaction cache complete, sortedMapSize: {}, mapSize: {}", Integer.valueOf(this.unCompletedTransactionSortedMap.size()), Integer.valueOf(this.unCompletedTransactionMap.size()));
                this.committedIndex = j;
                return;
            }
            logger.debug("remove transaction cache {}", Long.valueOf(j4));
            UnCompletedTransactionMetadata remove = this.unCompletedTransactionSortedMap.remove(Long.valueOf(j4));
            if (remove != null) {
                logger.debug("remove transaction cache, txId: {}", generateKey(remove.getApp(), remove.getId(), remove.getProducerId(), remove.getProducerEpoch(), remove.getEpoch()));
                this.unCompletedTransactionMap.remove(generateKey(remove.getApp(), remove.getId(), remove.getProducerId(), remove.getProducerEpoch(), remove.getEpoch()));
            }
            j3 = j4 + 1;
        }
    }

    protected void handleRetryTransaction(UnCompletedTransactionMetadata unCompletedTransactionMetadata) {
        logger.debug("retry transaction, txId: {}, metadata: {}", unCompletedTransactionMetadata.getId(), unCompletedTransactionMetadata);
        if (unCompletedTransactionMetadata.getState().equals(TransactionState.PREPARE_COMMIT)) {
            if (tryCommit(unCompletedTransactionMetadata)) {
                unCompletedTransactionMetadata.transitionStateTo(TransactionState.COMPLETE_COMMIT);
            } else {
                unCompletedTransactionMetadata.incrReties();
            }
        } else if (unCompletedTransactionMetadata.getState().equals(TransactionState.PREPARE_ABORT)) {
            if (tryAbort(unCompletedTransactionMetadata)) {
                unCompletedTransactionMetadata.transitionStateTo(TransactionState.COMPLETE_ABORT);
            } else {
                unCompletedTransactionMetadata.incrReties();
            }
        }
        if (unCompletedTransactionMetadata.isCompleted()) {
            logger.info("retry transaction success, metadata: {}", unCompletedTransactionMetadata);
        } else if (unCompletedTransactionMetadata.getReties() >= this.config.getTransactionLogRetries()) {
            logger.warn("retry transaction failed, metadata: {}", unCompletedTransactionMetadata);
            unCompletedTransactionMetadata.transitionStateTo(TransactionState.DEAD);
        }
        syncTransactionState(unCompletedTransactionMetadata);
    }

    protected void handleTimeoutTransaction(UnCompletedTransactionMetadata unCompletedTransactionMetadata) {
        logger.warn("transaction timeout, txId: {}, metadata: {}", unCompletedTransactionMetadata.getId(), unCompletedTransactionMetadata);
        tryAbort(unCompletedTransactionMetadata);
        unCompletedTransactionMetadata.transitionStateTo(TransactionState.DEAD);
        syncTransactionState(unCompletedTransactionMetadata);
    }

    protected void syncTransactionState(UnCompletedTransactionMetadata unCompletedTransactionMetadata) {
        TransactionMetadata tryGetTransaction = this.transactionMetadataManager.tryGetTransaction(unCompletedTransactionMetadata.getId());
        if (tryGetTransaction == null) {
            return;
        }
        tryGetTransaction.setState(unCompletedTransactionMetadata.getState());
    }

    protected boolean tryAbort(UnCompletedTransactionMetadata unCompletedTransactionMetadata) {
        try {
            return this.transactionSynchronizer.tryAbort(unCompletedTransactionMetadata, unCompletedTransactionMetadata.getPrepare());
        } catch (Exception e) {
            logger.error("tryAbort transaction exception, metadata: {}", unCompletedTransactionMetadata);
            return true;
        }
    }

    protected boolean tryCommit(UnCompletedTransactionMetadata unCompletedTransactionMetadata) {
        try {
            return this.transactionSynchronizer.tryCommit(unCompletedTransactionMetadata, unCompletedTransactionMetadata.getPrepare(), unCompletedTransactionMetadata.getOffsets());
        } catch (Exception e) {
            logger.error("tryCommit transaction exception, metadata: {}", unCompletedTransactionMetadata);
            return false;
        }
    }

    protected String generateKey(String str, String str2, long j, short s, short s2) {
        return String.format("%s_%s_%s_%s_%s", str, str2, Long.valueOf(j), Short.valueOf(s), Short.valueOf(s2));
    }
}
