/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.util;

import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class RecordEmitter<T extends TimestampedValue>
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RecordEmitter.class);
    public static final int DEFAULT_QUEUE_CAPACITY = 100;
    private final int queueCapacity;
    private final ConcurrentHashMap<Integer, AsyncRecordQueue<T>> queues = new ConcurrentHashMap();
    private final ConcurrentHashMap<AsyncRecordQueue<T>, Boolean> emptyQueues = new ConcurrentHashMap();
    private final PriorityQueue<AsyncRecordQueue<T>> heads = new PriorityQueue(this::compareHeadElement);
    private volatile boolean running = true;
    private volatile long maxEmitTimestamp = Long.MAX_VALUE;
    private long maxLookaheadMillis = 60000L;
    private long idleSleepMillis = 100L;
    private final Object condition = new Object();

    public RecordEmitter(int queueCapacity) {
        this.queueCapacity = queueCapacity;
    }

    private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue right) {
        return Long.compare(left.headTimestamp, right.headTimestamp);
    }

    public RecordQueue<T> getQueue(int producerIndex) {
        return this.queues.computeIfAbsent(producerIndex, key -> {
            AsyncRecordQueue q = new AsyncRecordQueue(producerIndex);
            this.emptyQueues.put(q, false);
            return q;
        });
    }

    public void setMaxLookaheadMillis(long maxLookaheadMillis) {
        this.maxLookaheadMillis = maxLookaheadMillis;
        LOG.info("[setMaxLookaheadMillis] Max lookahead millis set to {}", (Object)maxLookaheadMillis);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setCurrentWatermark(long watermark) {
        this.maxEmitTimestamp = watermark + this.maxLookaheadMillis;
        Object object = this.condition;
        synchronized (object) {
            this.condition.notify();
        }
        LOG.info("[setCurrentWatermark] Current watermark set to {}, maxEmitTimestamp = {}", (Object)watermark, (Object)this.maxEmitTimestamp);
    }

    @Override
    public void run() {
        LOG.info("Starting emitter with maxLookaheadMillis: {}", (Object)this.maxLookaheadMillis);
        this.emitRecords();
    }

    public void stop() {
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void emitRecords() {
        AsyncRecordQueue<T> min = this.heads.poll();
        block10: while (this.running) {
            TimestampedValue record;
            Object object;
            while (min == null) {
                if (!this.emptyQueues.isEmpty()) {
                    for (AsyncRecordQueue queueHead : this.emptyQueues.keySet()) {
                        if (queueHead.queue.isEmpty()) continue;
                        this.emptyQueues.remove(queueHead);
                        queueHead.headTimestamp = ((TimestampedValue)queueHead.queue.peek()).getTimestamp();
                        this.heads.offer(queueHead);
                    }
                }
                if ((min = this.heads.poll()) == null) {
                    object = this.condition;
                    synchronized (object) {
                        try {
                            this.condition.wait(this.idleSleepMillis);
                        }
                        catch (InterruptedException e) {
                            continue block10;
                        }
                    }
                }
                if (this.running) continue;
                break block10;
            }
            while (min.headTimestamp > this.maxEmitTimestamp) {
                object = this.condition;
                synchronized (object) {
                    try {
                        this.condition.wait(this.idleSleepMillis);
                    }
                    catch (InterruptedException e) {
                        continue block10;
                    }
                    if (min.headTimestamp > this.maxEmitTimestamp && !this.emptyQueues.isEmpty()) {
                        this.heads.offer(min);
                        min = null;
                        continue block10;
                    }
                }
                if (this.running) continue;
                break block10;
            }
            AsyncRecordQueue<T> nextQueue = this.heads.poll();
            int emitCount = 0;
            while ((record = (TimestampedValue)((AsyncRecordQueue)min).queue.poll()) != null) {
                this.emit(record, min);
                min.headTimestamp = record.getTimestamp();
                if ((nextQueue == null || min.headTimestamp <= nextQueue.headTimestamp) && min.headTimestamp <= this.maxEmitTimestamp && (emitCount++ <= this.queueCapacity || this.emptyQueues.isEmpty())) continue;
            }
            if (record == null) {
                this.emptyQueues.put(min, true);
            } else if (nextQueue != null && nextQueue.headTimestamp > min.headTimestamp) {
                this.heads.offer(nextQueue);
                nextQueue = min;
            } else {
                this.heads.offer(min);
            }
            min = nextQueue;
        }
    }

    protected abstract void emit(T var1, RecordQueue<T> var2);

    public String printInfo() {
        StringBuffer sb = new StringBuffer();
        sb.append(String.format("queues: %d, empty: %d", this.queues.size(), this.emptyQueues.size()));
        for (Map.Entry<Integer, AsyncRecordQueue<T>> e : this.queues.entrySet()) {
            AsyncRecordQueue<T> q = e.getValue();
            sb.append(String.format("\n%d timestamp: %s size: %d", ((AsyncRecordQueue)e.getValue()).queueId, q.headTimestamp, ((AsyncRecordQueue)q).queue.size()));
        }
        return sb.toString();
    }

    private class AsyncRecordQueue<T>
    implements RecordQueue<T> {
        private final ArrayBlockingQueue<T> queue;
        private final int queueId;
        long headTimestamp;

        private AsyncRecordQueue(int queueId) {
            this.queue = new ArrayBlockingQueue(RecordEmitter.this.queueCapacity);
            this.queueId = queueId;
            this.headTimestamp = Long.MAX_VALUE;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void put(T record) throws InterruptedException {
            this.queue.put(record);
            Object object = RecordEmitter.this.condition;
            synchronized (object) {
                RecordEmitter.this.condition.notify();
            }
        }

        @Override
        public int getSize() {
            return this.queue.size();
        }

        @Override
        public T peek() {
            return this.queue.peek();
        }
    }

    public static interface RecordQueue<T> {
        public void put(T var1) throws InterruptedException;

        public int getSize();

        public T peek();
    }
}

