/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.coordinator.transaction.log;

import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionDomain;
import org.joyqueue.broker.kafka.coordinator.transaction.helper.TransactionSerializer;
import org.joyqueue.broker.producer.Produce;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.session.Producer;
import org.joyqueue.toolkit.network.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionLogSegment {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionLogSegment.class);
    private final byte[] LOCAL_IP = IpUtil.getLocalIp().getBytes();
    private KafkaConfig config;
    private String topic;
    private short partition;
    private Produce produce;
    private Consume consume;
    private Consumer consumer;
    private Producer producer;

    public TransactionLogSegment(KafkaConfig config, String topic, short partition, Produce produce, Consume consume, Producer producer, Consumer consumer) {
        this.config = config;
        this.topic = topic;
        this.partition = partition;
        this.produce = produce;
        this.consume = consume;
        this.producer = producer;
        this.consumer = consumer;
    }

    public long getIndex() {
        long ackIndex = this.consume.getAckIndex(this.consumer, this.partition);
        if (ackIndex < 0L) {
            ackIndex = 0L;
        }
        return ackIndex;
    }

    public List<TransactionDomain> read(long index, int count) throws Exception {
        List<ByteBuffer> buffers = this.doRead(this.partition, index, count);
        if (CollectionUtils.isEmpty(buffers)) {
            return Collections.emptyList();
        }
        ArrayList result = Lists.newArrayListWithCapacity((int)buffers.size());
        for (ByteBuffer buffer : buffers) {
            TransactionDomain transactionDomain = TransactionSerializer.deserialize(buffer);
            result.add(transactionDomain);
        }
        return result;
    }

    public void saveIndex(long index) throws Exception {
        this.consume.setAckIndex(this.consumer, this.partition, index);
    }

    protected List<ByteBuffer> doRead(short partition, long index, int count) throws Exception {
        PullResult pullResult = this.consume.getMessage(this.consumer, partition, index, count);
        if (!pullResult.getCode().equals((Object)JoyQueueCode.SUCCESS)) {
            logger.error("read transaction log exception, partition: {}, index: {}, count: {}", new Object[]{partition, index, count, pullResult.getCode()});
            return Collections.emptyList();
        }
        ArrayList buffers = Lists.newArrayListWithCapacity((int)pullResult.getBuffers().size());
        for (ByteBuffer buffer : pullResult.getBuffers()) {
            BrokerMessage brokerMessage = Serializer.readBrokerMessage((ByteBuffer)buffer);
            buffers.add(brokerMessage.getBody());
        }
        return buffers;
    }

    public boolean batchWrite(String app, String transactionId, List<byte[]> bodyList) throws Exception {
        ArrayList messages = Lists.newArrayListWithCapacity((int)bodyList.size());
        for (byte[] body : bodyList) {
            BrokerMessage message = this.convertMessage(body);
            messages.add(message);
        }
        this.produce.putMessage(this.producer, (List)messages, this.config.getTransactionLogWriteQosLevel(), this.config.getTransactionSyncTimeout());
        return true;
    }

    public boolean write(String app, String transactionId, byte[] body) throws Exception {
        BrokerMessage message = this.convertMessage(body);
        this.produce.putMessage(this.producer, (List)Lists.newArrayList((Object[])new BrokerMessage[]{message}), this.config.getTransactionLogWriteQosLevel(), this.config.getTransactionSyncTimeout());
        return true;
    }

    protected BrokerMessage convertMessage(byte[] body) {
        BrokerMessage message = new BrokerMessage();
        message.setTopic(this.topic);
        message.setApp(this.producer.getApp());
        message.setBody(body);
        message.setClientIp(this.LOCAL_IP);
        message.setPartition(this.partition);
        return message;
    }

    public short getPartition() {
        return this.partition;
    }
}

