/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.coordinator.transaction.synchronizer;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.joyqueue.broker.coordinator.session.CoordinatorSession;
import org.joyqueue.broker.coordinator.session.CoordinatorSessionManager;
import org.joyqueue.broker.index.command.ConsumeIndexStoreRequest;
import org.joyqueue.broker.index.command.ConsumeIndexStoreResponse;
import org.joyqueue.broker.index.model.IndexAndMetadata;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionIdManager;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionOffset;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionPrepare;
import org.joyqueue.broker.kafka.coordinator.transaction.helper.TransactionHelper;
import org.joyqueue.broker.producer.transaction.command.TransactionCommitRequest;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.command.JoyQueueCommand;
import org.joyqueue.network.transport.command.JoyQueuePayload;
import org.joyqueue.nsr.NameService;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionCommitSynchronizer
extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionCommitSynchronizer.class);
    private KafkaConfig config;
    private CoordinatorSessionManager sessionManager;
    private TransactionIdManager transactionIdManager;
    private NameService nameService;

    public TransactionCommitSynchronizer(KafkaConfig config, CoordinatorSessionManager sessionManager, TransactionIdManager transactionIdManager, NameService nameService) {
        this.config = config;
        this.sessionManager = sessionManager;
        this.transactionIdManager = transactionIdManager;
        this.nameService = nameService;
    }

    public boolean commitPrepare(TransactionMetadata transactionMetadata, Set<TransactionPrepare> prepareList) throws Exception {
        Map<Broker, List<TransactionPrepare>> brokerPrepareMap = TransactionHelper.splitPrepareByBroker(prepareList);
        final CountDownLatch latch = new CountDownLatch(brokerPrepareMap.size());
        final boolean[] result = new boolean[]{true};
        for (Map.Entry<Broker, List<TransactionPrepare>> entry : brokerPrepareMap.entrySet()) {
            final Broker broker = entry.getKey();
            List<TransactionPrepare> brokerPrepareList = entry.getValue();
            TransactionPrepare brokerPrepare = brokerPrepareList.get(0);
            LinkedList txIds = Lists.newLinkedList();
            for (TransactionPrepare prepare : brokerPrepareList) {
                String txId = this.transactionIdManager.generateId(prepare.getTopic(), prepare.getPartition(), prepare.getApp(), prepare.getTransactionId(), prepare.getProducerId(), prepare.getProducerEpoch());
                txIds.add(txId);
            }
            CoordinatorSession session = this.sessionManager.getOrCreateSession(broker);
            final TransactionCommitRequest transactionCommitRequest = new TransactionCommitRequest(brokerPrepare.getTopic(), brokerPrepare.getApp(), (List)txIds);
            session.async((Command)new JoyQueueCommand((JoyQueuePayload)transactionCommitRequest), new CommandCallback(){

                public void onSuccess(Command request, Command response) {
                    if (response.getHeader().getStatus() != JoyQueueCode.SUCCESS.getCode() && response.getHeader().getStatus() != JoyQueueCode.CN_TRANSACTION_NOT_EXISTS.getCode()) {
                        logger.error("commit transaction error, broker: {}, request: {}", (Object)broker, (Object)transactionCommitRequest);
                        result[0] = false;
                    }
                    latch.countDown();
                }

                public void onException(Command request, Throwable cause) {
                    logger.error("commit transaction error, broker: {}, request: {}", new Object[]{broker, transactionCommitRequest, cause});
                    result[0] = false;
                    latch.countDown();
                }
            });
        }
        if (!latch.await(this.config.getTransactionSyncTimeout(), TimeUnit.MILLISECONDS)) {
            logger.error("commit transaction timeout, metadata: {}, prepare: {}", (Object)transactionMetadata, prepareList);
            return false;
        }
        return result[0];
    }

    public boolean commitOffsets(final TransactionMetadata transactionMetadata, Set<TransactionOffset> offsets) throws Exception {
        Map<Broker, List<TransactionOffset>> brokerOffsetMap = this.splitOffsetsByBroker(offsets);
        final CountDownLatch latch = new CountDownLatch(brokerOffsetMap.size());
        final boolean[] result = new boolean[]{true};
        for (Map.Entry<Broker, List<TransactionOffset>> entry : brokerOffsetMap.entrySet()) {
            final Broker broker = entry.getKey();
            final Map<String, Map<Integer, IndexAndMetadata>> saveOffsetParam = this.buildSaveOffsetParam(entry.getValue());
            try {
                CoordinatorSession session = this.sessionManager.getOrCreateSession(broker);
                ConsumeIndexStoreRequest indexStoreRequest = new ConsumeIndexStoreRequest(transactionMetadata.getApp(), saveOffsetParam);
                JoyQueueCommand request = new JoyQueueCommand((JoyQueuePayload)indexStoreRequest);
                session.async((Command)request, new CommandCallback(){

                    public void onSuccess(Command request, Command response) {
                        ConsumeIndexStoreResponse payload = (ConsumeIndexStoreResponse)response.getPayload();
                        for (Map.Entry topicEntry : payload.getIndexStoreStatus().entrySet()) {
                            String topic = (String)topicEntry.getKey();
                            for (Map.Entry partitionEntry : ((Map)topicEntry.getValue()).entrySet()) {
                                if (((Short)partitionEntry.getValue()).shortValue() == JoyQueueCode.SUCCESS.getCode()) continue;
                                logger.error("commit transaction offset error, broker: {}, topic: {}, partition: {}, code: {}", new Object[]{broker, topic, partitionEntry.getKey(), JoyQueueCode.valueOf((int)((Short)partitionEntry.getValue()).shortValue())});
                            }
                        }
                        latch.countDown();
                    }

                    public void onException(Command request, Throwable cause) {
                        logger.error("commit transaction offset failed, async transport exception, broker: {}, topic: {}, group: {}", new Object[]{broker, saveOffsetParam, transactionMetadata.getApp(), cause});
                        result[0] = false;
                        latch.countDown();
                    }
                });
            }
            catch (Throwable t) {
                logger.error("sync offset failed, async transport exception, topic: {}, group: {}, leader: {id: {}, ip: {}, port: {}}", new Object[]{saveOffsetParam, transactionMetadata.getApp(), broker.getId(), broker.getIp(), broker.getBackEndPort(), t});
                latch.countDown();
            }
        }
        if (!latch.await(this.config.getTransactionSyncTimeout(), TimeUnit.MILLISECONDS)) {
            logger.error("commit transaction timeout, metadata: {}, offsets: {}", (Object)transactionMetadata, offsets);
            return false;
        }
        return result[0];
    }

    protected Map<String, Map<Integer, IndexAndMetadata>> buildSaveOffsetParam(List<TransactionOffset> offsets) {
        HashMap result = Maps.newHashMap();
        for (TransactionOffset offset : offsets) {
            String topic = offset.getTopic();
            Map partitionMetadataMap = (Map)result.get(topic);
            if (partitionMetadataMap == null) {
                partitionMetadataMap = Maps.newHashMap();
                result.put(topic, partitionMetadataMap);
            }
            IndexAndMetadata indexAndMetadata = new IndexAndMetadata(offset.getOffset(), null);
            partitionMetadataMap.put(Integer.valueOf(offset.getPartition()), indexAndMetadata);
        }
        return result;
    }

    protected Map<Broker, List<TransactionOffset>> splitOffsetsByBroker(Set<TransactionOffset> offsets) {
        HashMap result = Maps.newHashMap();
        for (TransactionOffset offset : offsets) {
            Broker broker;
            Integer leader;
            PartitionGroup partitionGroup;
            TopicConfig topic = this.nameService.getTopicConfig(TopicName.parse((String)offset.getTopic()));
            if (topic == null || (partitionGroup = topic.fetchPartitionGroupByPartition(offset.getPartition())) == null || (leader = partitionGroup.getLeader()) == null || leader <= 0 || (broker = this.nameService.getBroker(leader.intValue())) == null) continue;
            List brokerOffsets = (List)result.get(broker);
            if (brokerOffsets == null) {
                brokerOffsets = Lists.newLinkedList();
                result.put(broker, brokerOffsets);
            }
            brokerOffsets.add(offset);
        }
        return result;
    }
}

