/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.coordinator.transaction.log;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.Coordinator;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionDomain;
import org.joyqueue.broker.kafka.coordinator.transaction.helper.TransactionSerializer;
import org.joyqueue.broker.kafka.coordinator.transaction.log.TransactionLogSegment;
import org.joyqueue.broker.producer.Produce;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.session.Producer;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionLog
extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionLog.class);
    private KafkaConfig config;
    private Produce produce;
    private Consume consume;
    private Coordinator coordinator;
    private ClusterManager clusterManager;
    private Consumer consumer;
    private Producer producer;
    private final ConcurrentMap<Short, TransactionLogSegment> segmentMap = Maps.newConcurrentMap();

    public TransactionLog(KafkaConfig config, Produce produce, Consume consume, Coordinator coordinator, ClusterManager clusterManager) {
        this.config = config;
        this.produce = produce;
        this.consume = consume;
        this.coordinator = coordinator;
        this.clusterManager = clusterManager;
    }

    protected void validate() throws Exception {
        this.consumer = this.initConsumer();
        this.producer = this.initProducer();
    }

    protected Consumer initConsumer() {
        return new Consumer(this.config.getTransactionLogApp(), this.coordinator.getTransactionTopic().getFullName(), this.config.getTransactionLogApp(), Consumer.ConsumeType.INTERNAL);
    }

    protected Producer initProducer() {
        return new Producer(this.config.getTransactionLogApp(), this.coordinator.getTransactionTopic().getFullName(), this.config.getTransactionLogApp(), Producer.ProducerType.INTERNAL);
    }

    public boolean write(String app, String transactionId, TransactionDomain transactionDomain) throws Exception {
        TransactionLogSegment transactionLogSegment = this.resolveSegment(app, transactionId);
        if (transactionLogSegment == null) {
            throw new JoyQueueException(String.format("logSegment not exist, app: %s, transactionId: %s", app, transactionId), JoyQueueCode.SE_WRITE_FAILED.getCode());
        }
        byte[] body = TransactionSerializer.serialize(transactionDomain);
        return transactionLogSegment.write(app, transactionId, body);
    }

    public boolean batchWrite(String app, String transactionId, Set<? extends TransactionDomain> transactionDomains) throws Exception {
        TransactionLogSegment transactionLogSegment = this.resolveSegment(app, transactionId);
        if (transactionLogSegment == null) {
            throw new JoyQueueException(String.format("logSegment not exist, app: %s, transactionId: %s", app, transactionId), JoyQueueCode.SE_WRITE_FAILED.getCode());
        }
        ArrayList bodyList = Lists.newArrayListWithCapacity((int)transactionDomains.size());
        for (TransactionDomain transactionDomain : transactionDomains) {
            byte[] body = TransactionSerializer.serialize(transactionDomain);
            bodyList.add(body);
        }
        return transactionLogSegment.batchWrite(app, transactionId, bodyList);
    }

    public List<TransactionLogSegment> getSegments() {
        return Lists.newArrayList(this.segmentMap.values());
    }

    public TransactionLogSegment getSegment(short partition) {
        return (TransactionLogSegment)this.segmentMap.get(partition);
    }

    public TransactionLogSegment removeSegment(short partition) {
        return (TransactionLogSegment)this.segmentMap.remove(partition);
    }

    public List<Short> getPartitions() {
        LinkedList result = Lists.newLinkedList();
        TopicConfig topicConfig = this.coordinator.getTransactionTopicConfig();
        for (Map.Entry entry : topicConfig.getPartitionGroups().entrySet()) {
            PartitionGroup partitionGroup = (PartitionGroup)entry.getValue();
            if (partitionGroup.getLeader() == null || partitionGroup.getLeader() <= 0 || !partitionGroup.getLeader().equals(this.clusterManager.getBrokerId())) continue;
            result.addAll(partitionGroup.getPartitions());
        }
        return result;
    }

    protected TransactionLogSegment resolveSegment(String app, String transactionId) {
        short partition = this.resolvePartition(app, transactionId);
        if (partition < 0) {
            return null;
        }
        return this.getOrCreateSegment(partition);
    }

    protected TransactionLogSegment getOrCreateSegment(short partition) {
        TransactionLogSegment transactionLogSegment = (TransactionLogSegment)this.segmentMap.get(partition);
        if (transactionLogSegment == null) {
            transactionLogSegment = new TransactionLogSegment(this.config, this.coordinator.getTransactionTopic().getFullName(), partition, this.produce, this.consume, this.producer, this.consumer);
            TransactionLogSegment oldTransactionLogSegment = this.segmentMap.putIfAbsent(partition, transactionLogSegment);
            if (oldTransactionLogSegment != null) {
                transactionLogSegment = oldTransactionLogSegment;
            }
        }
        return transactionLogSegment;
    }

    protected short resolvePartition(String app, String transactionId) {
        TopicConfig topicConfig = this.coordinator.getTransactionTopicConfig();
        for (Map.Entry entry : topicConfig.getPartitionGroups().entrySet()) {
            PartitionGroup partitionGroup = (PartitionGroup)entry.getValue();
            if (partitionGroup.getLeader() == null || partitionGroup.getLeader() <= 0 || !partitionGroup.getLeader().equals(this.clusterManager.getBrokerId())) continue;
            return (Short)partitionGroup.getPartitions().iterator().next();
        }
        return -1;
    }
}

