/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceState;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
import org.apache.seatunnel.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMqSourceSplitEnumerator
implements SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> {
    private static final Logger log = LoggerFactory.getLogger(RocketMqSourceSplitEnumerator.class);
    private static final long DEFAULT_DISCOVERY_INTERVAL_MILLIS = 60000L;
    private final Map<MessageQueue, RocketMqSourceSplit> assignedSplit;
    private final ConsumerMetadata metadata;
    private final SourceSplitEnumerator.Context<RocketMqSourceSplit> context;
    private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
    private ScheduledExecutorService executor;
    private ScheduledFuture scheduledFuture;
    private long discoveryIntervalMillis;

    public RocketMqSourceSplitEnumerator(ConsumerMetadata metadata, SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
        this.metadata = metadata;
        this.context = context;
        this.assignedSplit = new HashMap<MessageQueue, RocketMqSourceSplit>();
        this.pendingSplit = new HashMap<MessageQueue, RocketMqSourceSplit>();
        System.setProperty("rocketmq.client.logUseSlf4j", "true");
    }

    public RocketMqSourceSplitEnumerator(ConsumerMetadata metadata, SourceSplitEnumerator.Context<RocketMqSourceSplit> context, long discoveryIntervalMillis) {
        this(metadata, context);
        this.discoveryIntervalMillis = discoveryIntervalMillis;
    }

    private static int getSplitOwner(MessageQueue messageQueue, int numReaders) {
        int startIndex = (messageQueue.getQueueId() * 31 & Integer.MAX_VALUE) % numReaders;
        return (startIndex + messageQueue.getQueueId()) % numReaders;
    }

    public void open() {
        long l = this.discoveryIntervalMillis = this.discoveryIntervalMillis > 0L ? this.discoveryIntervalMillis : 60000L;
        if (this.discoveryIntervalMillis > 0L) {
            this.executor = Executors.newScheduledThreadPool(1, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName("RocketMq-messageQueue-dynamic-discovery");
                return thread;
            });
            this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                try {
                    this.discoverySplits();
                }
                catch (Exception e) {
                    log.error("Dynamic discovery failure:", (Throwable)e);
                }
            }, this.discoveryIntervalMillis, this.discoveryIntervalMillis, TimeUnit.MILLISECONDS);
        }
    }

    public void run() throws Exception {
        this.fetchPendingPartitionSplit();
        this.setPartitionStartOffset();
        this.assignSplit();
    }

    public void close() throws IOException {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            if (this.executor != null) {
                this.executor.shutdownNow();
            }
        }
    }

    public void addSplitsBack(List<RocketMqSourceSplit> splits, int subtaskId) {
        if (!splits.isEmpty()) {
            this.pendingSplit.putAll(this.convertToNextSplit(splits));
            this.assignSplit();
        }
    }

    private Map<MessageQueue, ? extends RocketMqSourceSplit> convertToNextSplit(List<RocketMqSourceSplit> splits) {
        try {
            Map<MessageQueue, Long> listOffsets = this.listOffsets(splits.stream().map(RocketMqSourceSplit::getMessageQueue).collect(Collectors.toList()), ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            splits.forEach(split -> {
                split.setStartOffset(Math.min(split.getEndOffset() + 1L, (Long)listOffsets.get(split.getMessageQueue())));
                split.setEndOffset((Long)listOffsets.get(split.getMessageQueue()));
            });
            return splits.stream().collect(Collectors.toMap(split -> split.getMessageQueue(), split -> split));
        }
        catch (Exception e) {
            throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplit.size();
    }

    public void handleSplitRequest(int subtaskId) {
    }

    public void registerReader(int subtaskId) {
        if (!this.pendingSplit.isEmpty()) {
            this.assignSplit();
        }
    }

    public RocketMqSourceState snapshotState(long checkpointId) throws Exception {
        return new RocketMqSourceState(this.assignedSplit.values().stream().collect(Collectors.toSet()));
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }

    private void discoverySplits() {
        this.fetchPendingPartitionSplit();
        this.assignSplit();
    }

    private void fetchPendingPartitionSplit() {
        this.getTopicInfo().forEach(split -> {
            if (!this.assignedSplit.containsKey(split.getMessageQueue()) && !this.pendingSplit.containsKey(split.getMessageQueue())) {
                this.pendingSplit.put(split.getMessageQueue(), (RocketMqSourceSplit)split);
            }
        });
    }

    private Set<RocketMqSourceSplit> getTopicInfo() {
        log.info("Configured topics: {}", this.metadata.getTopics());
        List<Map<MessageQueue, TopicOffset>> offsetTopics = RocketMqAdminUtil.offsetTopics(this.metadata.getBaseConfig(), this.metadata.getTopics());
        Set sourceSplits = Sets.newConcurrentHashSet();
        offsetTopics.forEach(messageQueueOffsets -> messageQueueOffsets.forEach((messageQueue, topicOffset) -> sourceSplits.add(new RocketMqSourceSplit((MessageQueue)messageQueue, topicOffset.getMinOffset(), topicOffset.getMaxOffset()))));
        return sourceSplits;
    }

    private void setPartitionStartOffset() throws MQClientException {
        Set<MessageQueue> topicPartitions = this.pendingSplit.keySet();
        Map<MessageQueue, Long> topicPartitionOffsets = null;
        switch (this.metadata.getStartMode()) {
            case CONSUME_FROM_FIRST_OFFSET: {
                topicPartitionOffsets = this.listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                break;
            }
            case CONSUME_FROM_LAST_OFFSET: {
                topicPartitionOffsets = this.listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {
                topicPartitionOffsets = this.listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
                break;
            }
            case CONSUME_FROM_GROUP_OFFSETS: {
                topicPartitionOffsets = this.listConsumerGroupOffsets(topicPartitions);
                if (!topicPartitionOffsets.isEmpty()) break;
                topicPartitionOffsets = this.listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                break;
            }
            case CONSUME_FROM_SPECIFIC_OFFSETS: {
                topicPartitionOffsets = this.metadata.getSpecificStartOffsets();
                this.setMessageQueueBroker(topicPartitions, topicPartitionOffsets);
                break;
            }
            default: {
                throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR, this.metadata.getStartMode().name());
            }
        }
        topicPartitionOffsets.entrySet().forEach(entry -> {
            if (this.pendingSplit.containsKey(entry.getKey())) {
                this.pendingSplit.get(entry.getKey()).setStartOffset((Long)entry.getValue());
            }
        });
    }

    private void setMessageQueueBroker(Collection<MessageQueue> topicPartitions, Map<MessageQueue, Long> topicPartitionOffsets) {
        Map<String, String> flatTopicPartitions = topicPartitions.stream().collect(Collectors.toMap(messageQueue -> messageQueue.getTopic() + "-" + messageQueue.getQueueId(), MessageQueue::getBrokerName));
        for (MessageQueue messageQueue2 : topicPartitionOffsets.keySet()) {
            String key = messageQueue2.getTopic() + "-" + messageQueue2.getQueueId();
            if (!flatTopicPartitions.containsKey(key)) continue;
            messageQueue2.setBrokerName(flatTopicPartitions.get(key));
        }
    }

    private Map<MessageQueue, Long> listOffsets(Collection<MessageQueue> messageQueues, ConsumeFromWhere consumeFromWhere) {
        ConcurrentMap results = Maps.newConcurrentMap();
        Map<MessageQueue, TopicOffset> messageQueueOffsets = RocketMqAdminUtil.flatOffsetTopics(this.metadata.getBaseConfig(), this.metadata.getTopics());
        switch (consumeFromWhere) {
            case CONSUME_FROM_FIRST_OFFSET: {
                messageQueues.forEach(messageQueue -> {
                    TopicOffset topicOffset = (TopicOffset)messageQueueOffsets.get(messageQueue);
                    results.put(messageQueue, topicOffset.getMinOffset());
                });
                break;
            }
            case CONSUME_FROM_LAST_OFFSET: {
                messageQueues.forEach(messageQueue -> {
                    TopicOffset topicOffset = (TopicOffset)messageQueueOffsets.get(messageQueue);
                    results.put(messageQueue, topicOffset.getMaxOffset());
                });
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {
                results.putAll(RocketMqAdminUtil.searchOffsetsByTimestamp(this.metadata.getBaseConfig(), messageQueues, this.metadata.getStartOffsetsTimestamp()));
                break;
            }
        }
        return results;
    }

    public Map<MessageQueue, Long> listConsumerGroupOffsets(Collection<MessageQueue> messageQueues) {
        return RocketMqAdminUtil.currentOffsets(this.metadata.getBaseConfig(), this.metadata.getTopics(), new HashSet<MessageQueue>(messageQueues));
    }

    private synchronized void assignSplit() {
        HashMap<Integer, List> readySplit = new HashMap<Integer, List>(16);
        for (int taskID = 0; taskID < this.context.currentParallelism(); ++taskID) {
            readySplit.computeIfAbsent(taskID, id -> new ArrayList());
        }
        this.pendingSplit.entrySet().forEach(s -> {
            if (!this.assignedSplit.containsKey(s.getKey())) {
                ((List)readySplit.get(RocketMqSourceSplitEnumerator.getSplitOwner((MessageQueue)s.getKey(), this.context.currentParallelism()))).add(s.getValue());
            }
        });
        readySplit.forEach((arg_0, arg_1) -> this.context.assignSplit(arg_0, arg_1));
        this.assignedSplit.putAll(this.pendingSplit);
        this.pendingSplit.clear();
    }
}

