/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.coordinator.transaction.synchronizer;

import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.coordinator.session.CoordinatorSessionManager;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionIdManager;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMarker;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionOffset;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionPrepare;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionState;
import org.joyqueue.broker.kafka.coordinator.transaction.log.TransactionLog;
import org.joyqueue.broker.kafka.coordinator.transaction.synchronizer.TransactionAbortSynchronizer;
import org.joyqueue.broker.kafka.coordinator.transaction.synchronizer.TransactionCommitSynchronizer;
import org.joyqueue.nsr.NameService;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionSynchronizer
extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionSynchronizer.class);
    private KafkaConfig config;
    private TransactionIdManager transactionIdManager;
    private TransactionLog transactionLog;
    private CoordinatorSessionManager sessionManager;
    private NameService nameService;
    private TransactionCommitSynchronizer transactionCommitSynchronizer;
    private TransactionAbortSynchronizer transactionAbortSynchronizer;

    public TransactionSynchronizer(KafkaConfig config, TransactionIdManager transactionIdManager, TransactionLog transactionLog, CoordinatorSessionManager sessionManager, NameService nameService) {
        this.config = config;
        this.transactionIdManager = transactionIdManager;
        this.transactionLog = transactionLog;
        this.sessionManager = sessionManager;
        this.nameService = nameService;
    }

    protected void validate() throws Exception {
        this.transactionCommitSynchronizer = new TransactionCommitSynchronizer(this.config, this.sessionManager, this.transactionIdManager, this.nameService);
        this.transactionAbortSynchronizer = new TransactionAbortSynchronizer(this.config, this.sessionManager, this.transactionIdManager);
    }

    protected void doStart() throws Exception {
        this.transactionCommitSynchronizer.start();
        this.transactionAbortSynchronizer.start();
    }

    protected void doStop() {
        if (this.transactionCommitSynchronizer != null) {
            this.transactionCommitSynchronizer.stop();
        }
        if (this.transactionAbortSynchronizer != null) {
            this.transactionAbortSynchronizer.stop();
        }
    }

    public boolean prepare(TransactionMetadata transactionMetadata, Set<TransactionPrepare> prepare) throws Exception {
        return this.transactionLog.batchWrite(transactionMetadata.getApp(), transactionMetadata.getId(), prepare);
    }

    public boolean prepareCommit(TransactionMetadata transactionMetadata, Set<TransactionPrepare> prepare) throws Exception {
        return this.writeMarker(transactionMetadata, TransactionState.PREPARE_COMMIT);
    }

    public boolean commit(TransactionMetadata transactionMetadata, Set<TransactionPrepare> prepare, Set<TransactionOffset> offsets) throws Exception {
        if (!this.tryCommit(transactionMetadata, prepare, offsets)) {
            return false;
        }
        return this.writeMarker(transactionMetadata, TransactionState.COMPLETE_COMMIT);
    }

    public boolean tryCommit(TransactionMetadata transactionMetadata, Set<TransactionPrepare> prepare, Set<TransactionOffset> offsets) throws Exception {
        boolean isSuccess = true;
        if (CollectionUtils.isNotEmpty(prepare)) {
            isSuccess = this.transactionCommitSynchronizer.commitPrepare(transactionMetadata, prepare);
        }
        if (isSuccess && CollectionUtils.isNotEmpty(offsets)) {
            isSuccess = this.transactionCommitSynchronizer.commitOffsets(transactionMetadata, offsets);
        }
        return isSuccess;
    }

    public boolean prepareAbort(TransactionMetadata transactionMetadata, Set<TransactionPrepare> prepare) throws Exception {
        return this.writeMarker(transactionMetadata, TransactionState.PREPARE_ABORT);
    }

    public boolean abort(TransactionMetadata transactionMetadata, Set<TransactionPrepare> prepare) throws Exception {
        return this.tryAbort(transactionMetadata, prepare) && this.writeMarker(transactionMetadata, TransactionState.COMPLETE_ABORT);
    }

    public boolean tryAbort(TransactionMetadata transactionMetadata, Set<TransactionPrepare> prepare) throws Exception {
        return this.transactionAbortSynchronizer.abort(transactionMetadata, prepare);
    }

    public boolean commitOffset(TransactionMetadata transactionMetadata, Set<TransactionOffset> offsets) throws Exception {
        return this.transactionLog.batchWrite(transactionMetadata.getApp(), transactionMetadata.getId(), offsets);
    }

    protected boolean writeMarker(TransactionMetadata transactionMetadata, TransactionState transactionState) throws Exception {
        TransactionMarker marker = this.convertMarker(transactionMetadata, transactionState);
        return this.transactionLog.write(transactionMetadata.getApp(), transactionMetadata.getId(), marker);
    }

    protected TransactionMarker convertMarker(TransactionMetadata transactionMetadata, TransactionState transactionState) {
        return new TransactionMarker(transactionMetadata.getApp(), transactionMetadata.getId(), transactionMetadata.getProducerId(), transactionMetadata.getProducerEpoch(), transactionMetadata.getEpoch(), transactionState, transactionMetadata.getTimeout(), SystemClock.now());
    }
}

