/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.task.flow;

import com.hazelcast.collection.IQueue;
import com.hazelcast.core.HazelcastInstance;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.flow.AbstractFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.OneOutputFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShuffleSourceFlowLifeCycle<T>
extends AbstractFlowLifeCycle
implements OneOutputFlowLifeCycle<Record<?>> {
    private static final Logger log = LoggerFactory.getLogger(ShuffleSourceFlowLifeCycle.class);
    private final ShuffleAction shuffleAction;
    private final int shuffleBatchSize;
    private final IQueue<Record<?>>[] shuffles;
    private Map<Integer, List<Record<?>>> unsentBufferMap = new HashMap();
    private final Map<Integer, Barrier> alignedBarriers = new HashMap<Integer, Barrier>();
    private long currentCheckpointId = Long.MAX_VALUE;
    private int alignedBarriersCounter = 0;

    public ShuffleSourceFlowLifeCycle(SeaTunnelTask runningTask, int taskIndex, ShuffleAction shuffleAction, HazelcastInstance hazelcastInstance, CompletableFuture<Void> completableFuture) {
        super(runningTask, completableFuture);
        int pipelineId = runningTask.getTaskLocation().getPipelineId();
        this.shuffleAction = shuffleAction;
        this.shuffles = shuffleAction.getConfig().getShuffleStrategy().getShuffles(hazelcastInstance, pipelineId, taskIndex);
        this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
    }

    @Override
    public void collect(Collector<Record<?>> collector) throws Exception {
        int emptyShuffleQueueCount = 0;
        block0: for (int i = 0; i < this.shuffles.length; ++i) {
            IQueue<Record<?>> shuffleQueue = this.shuffles[i];
            List unsentBuffer = this.unsentBufferMap.computeIfAbsent(i, k -> new LinkedList());
            if (shuffleQueue.size() == 0) {
                ++emptyShuffleQueueCount;
                continue;
            }
            if (this.alignedBarriers.get(i) != null && this.alignedBarriers.get(i).getId() == this.currentCheckpointId) continue;
            LinkedList<Record> shuffleBatch = new LinkedList<Record>();
            if (this.alignedBarriersCounter > 0) {
                shuffleBatch.add((Record)shuffleQueue.take());
            } else if (!unsentBuffer.isEmpty()) {
                shuffleBatch.addAll(unsentBuffer);
                unsentBuffer.clear();
            }
            shuffleQueue.drainTo(shuffleBatch, this.shuffleBatchSize);
            for (int recordIndex = 0; recordIndex < shuffleBatch.size(); ++recordIndex) {
                Record record = (Record)shuffleBatch.get(recordIndex);
                if (record.getData() instanceof Barrier) {
                    long startTime = System.currentTimeMillis();
                    Barrier barrier = (Barrier)record.getData();
                    this.alignedBarriers.put(i, barrier);
                    ++this.alignedBarriersCounter;
                    this.currentCheckpointId = barrier.getId();
                    if (this.alignedBarriersCounter == this.shuffles.length) {
                        if (barrier.prepareClose(this.runningTask.getTaskLocation())) {
                            this.prepareClose = true;
                        }
                        if (barrier.snapshot()) {
                            this.runningTask.addState(barrier, ActionStateKey.of((Action)this.shuffleAction), Collections.emptyList());
                        }
                        this.runningTask.ack(barrier);
                        collector.collect((Object)record);
                        log.debug("trigger barrier [{}] finished, cost: {}ms. taskLocation: [{}]", new Object[]{barrier.getId(), System.currentTimeMillis() - startTime, this.runningTask.getTaskLocation()});
                        this.alignedBarriersCounter = 0;
                        this.alignedBarriers.clear();
                    }
                    if (recordIndex + 1 >= shuffleBatch.size()) continue block0;
                    unsentBuffer.addAll(shuffleBatch.subList(recordIndex + 1, shuffleBatch.size()));
                    continue block0;
                }
                if (this.prepareClose.booleanValue()) {
                    return;
                }
                collector.collect((Object)record);
            }
        }
        if (emptyShuffleQueueCount == this.shuffles.length) {
            Thread.sleep(100L);
        }
    }

    @Override
    public void close() throws IOException {
        super.close();
        for (IQueue<Record<?>> shuffleQueue : this.shuffles) {
            log.info("destroy shuffle queue: {}", (Object)shuffleQueue.getName());
            shuffleQueue.destroy();
        }
    }
}

