/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqConsumerThread;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplit;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMqSourceReader
implements SourceReader<SeaTunnelRow, RocketMqSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(RocketMqSourceReader.class);
    private static final long THREAD_WAIT_TIME = 500L;
    private final SourceReader.Context context;
    private final ConsumerMetadata metadata;
    private final Set<RocketMqSourceSplit> sourceSplits;
    private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets;
    private final Map<MessageQueue, RocketMqConsumerThread> consumerThreads;
    private final ExecutorService executorService;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private final LinkedBlockingQueue<RocketMqSourceSplit> pendingPartitionsQueue;
    private volatile boolean running = false;

    public RocketMqSourceReader(ConsumerMetadata metadata, DeserializationSchema<SeaTunnelRow> deserializationSchema, SourceReader.Context context) {
        this.metadata = metadata;
        this.context = context;
        this.sourceSplits = new HashSet<RocketMqSourceSplit>();
        this.deserializationSchema = deserializationSchema;
        this.consumerThreads = new ConcurrentHashMap<MessageQueue, RocketMqConsumerThread>();
        this.checkpointOffsets = new ConcurrentHashMap<Long, Map<MessageQueue, Long>>();
        this.executorService = Executors.newCachedThreadPool(r -> new Thread(r, "RocketMq Source Data Consumer"));
        this.pendingPartitionsQueue = new LinkedBlockingQueue();
        System.setProperty("rocketmq.client.logUseSlf4j", "true");
    }

    public void open() throws Exception {
    }

    public void close() throws IOException {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        if (!this.running) {
            Thread.sleep(500L);
            return;
        }
        while (!this.pendingPartitionsQueue.isEmpty()) {
            this.sourceSplits.add(this.pendingPartitionsQueue.poll());
        }
        this.sourceSplits.forEach(sourceSplit -> this.consumerThreads.computeIfAbsent(sourceSplit.getMessageQueue(), s -> {
            RocketMqConsumerThread thread = new RocketMqConsumerThread(this.metadata);
            this.executorService.submit(thread);
            return thread;
        }));
        this.sourceSplits.forEach(sourceSplit -> {
            CompletableFuture completableFuture = new CompletableFuture();
            try {
                RocketMqConsumerThread rocketMqConsumerThread = this.consumerThreads.get(sourceSplit.getMessageQueue());
                rocketMqConsumerThread.getTasks().put(consumer -> {
                    try {
                        rocketMqConsumerThread.assign((RocketMqSourceSplit)sourceSplit);
                        MessageQueue assignedMessageQueue = sourceSplit.getMessageQueue();
                        List<MessageExt> records = consumer.poll(this.metadata.getBaseConfig().getPollTimeoutMillis());
                        if (records.isEmpty()) {
                            log.warn("Rocketmq consumer can not pull data, split {}, start offset {}, end offset {}", new Object[]{sourceSplit.getMessageQueue(), sourceSplit.getStartOffset(), sourceSplit.getEndOffset()});
                        }
                        List messages = records.stream().filter(record -> this.isQueueMatch(assignedMessageQueue, (MessageExt)record)).collect(Collectors.toList());
                        long lastOffset = -1L;
                        for (MessageExt record2 : messages) {
                            boolean shouldProcess;
                            boolean bl = shouldProcess = this.metadata.getTags() == null || this.metadata.getTags().isEmpty() || this.metadata.getTags().contains(record2.getTags());
                            if (shouldProcess) {
                                this.deserializationSchema.deserialize(record2.getBody(), output);
                                lastOffset = record2.getQueueOffset();
                            }
                            if (!Boundedness.BOUNDED.equals((Object)this.context.getBoundedness()) || record2.getQueueOffset() < sourceSplit.getEndOffset()) continue;
                            break;
                        }
                        if (lastOffset >= 0L) {
                            sourceSplit.setStartOffset(lastOffset + 1L);
                            rocketMqConsumerThread.markLastPolledOffset(lastOffset);
                        }
                        if (lastOffset >= sourceSplit.getEndOffset()) {
                            sourceSplit.setEndOffset(lastOffset);
                        }
                    }
                    catch (Throwable e) {
                        completableFuture.completeExceptionally(e);
                    }
                    completableFuture.complete(null);
                });
            }
            catch (InterruptedException e) {
                throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.CONSUME_DATA_FAILED, e);
            }
            completableFuture.join();
        });
        if (Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) {
            this.context.signalNoMoreElement();
        }
    }

    private boolean isQueueMatch(MessageQueue assignedMessageQueue, MessageExt record) {
        return Objects.equals(assignedMessageQueue.getTopic(), record.getTopic()) && Objects.equals(assignedMessageQueue.getBrokerName(), record.getBrokerName()) && Objects.equals(assignedMessageQueue.getQueueId(), record.getQueueId());
    }

    public List<RocketMqSourceSplit> snapshotState(long checkpointId) throws Exception {
        List<RocketMqSourceSplit> pendingSplit = this.sourceSplits.stream().map(RocketMqSourceSplit::copy).collect(Collectors.toList());
        Map offsets = this.checkpointOffsets.computeIfAbsent(checkpointId, id -> Maps.newConcurrentMap());
        for (RocketMqSourceSplit split : pendingSplit) {
            offsets.put(split.getMessageQueue(), split.getStartOffset());
        }
        return pendingSplit;
    }

    public void addSplits(List<RocketMqSourceSplit> splits) {
        this.running = true;
        splits.forEach(s -> {
            try {
                this.pendingPartitionsQueue.put((RocketMqSourceSplit)s);
            }
            catch (InterruptedException e) {
                throw new RocketMqConnectorException((SeaTunnelErrorCode)RocketMqConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
            }
        });
    }

    public void handleNoMoreSplits() {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!this.checkpointOffsets.containsKey(checkpointId)) {
            log.warn("checkpoint {} do not exist or have already been committed.", (Object)checkpointId);
        } else {
            Map<MessageQueue, Long> messageQueueOffset = this.checkpointOffsets.remove(checkpointId);
            for (Map.Entry<MessageQueue, Long> entry : messageQueueOffset.entrySet()) {
                MessageQueue messageQueue = entry.getKey();
                Long offset = entry.getValue();
                try {
                    RocketMqConsumerThread rocketMqConsumerThread;
                    if (messageQueue == null || offset == null || (rocketMqConsumerThread = this.consumerThreads.get(messageQueue)) == null) continue;
                    rocketMqConsumerThread.getTasks().put(consumer -> {
                        if (this.metadata.isEnabledCommitCheckpoint()) {
                            consumer.getOffsetStore().updateOffset(messageQueue, offset, false);
                            consumer.getOffsetStore().persist(messageQueue);
                        }
                    });
                }
                catch (InterruptedException e) {
                    log.error("commit offset failed", (Throwable)e);
                }
            }
        }
    }
}

