/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle.graph;

import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorFinishedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.runtime.operators.lifecycle.graph.TestDataElement;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

class TestEventSource
extends RichSourceFunction<TestDataElement>
implements ParallelSourceFunction<TestDataElement> {
    private final String operatorID;
    private final TestCommandDispatcher commandQueue;
    private transient Queue<TestCommand> scheduledCommands;
    private volatile transient boolean isRunning = true;
    private final TestEventQueue eventQueue;

    TestEventSource(String operatorID, TestEventQueue eventQueue, TestCommandDispatcher commandQueue) {
        this.operatorID = operatorID;
        this.eventQueue = eventQueue;
        this.commandQueue = commandQueue;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.isRunning = true;
        this.scheduledCommands = new LinkedBlockingQueue<TestCommand>();
        this.commandQueue.subscribe(cmd -> this.scheduledCommands.add(cmd), this.operatorID);
        this.eventQueue.add(new OperatorStartedEvent(this.operatorID, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getAttemptNumber()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<TestDataElement> ctx) {
        long lastSent = 0L;
        while (this.isRunning) {
            TestCommand cmd;
            TestCommand testCommand = cmd = lastSent == 0L ? null : this.scheduledCommands.poll();
            if (cmd == TestCommand.FINISH_SOURCES) {
                this.ack(cmd);
                this.isRunning = false;
                continue;
            }
            if (cmd == TestCommand.FAIL) {
                this.ack(cmd);
                throw new RuntimeException("requested to fail");
            }
            if (cmd == null) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)new TestDataElement(this.operatorID, this.getRuntimeContext().getIndexOfThisSubtask(), ++lastSent));
                    continue;
                }
            }
            throw new RuntimeException("unknown command " + cmd);
        }
        Object object = ctx.getCheckpointLock();
        synchronized (object) {
            this.eventQueue.add(new OperatorFinishedEvent(this.operatorID, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getAttemptNumber(), lastSent, new OperatorFinishedEvent.LastReceivedVertexDataInfo(Collections.emptyMap())));
        }
    }

    private void ack(TestCommand cmd) {
        this.eventQueue.add(new TestCommandAckEvent(this.operatorID, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getAttemptNumber(), cmd));
    }

    public void cancel() {
        this.isRunning = false;
    }
}

