/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle.event;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;

class TestEventQueueImpl
implements TestEventQueue {
    private final List<TestEvent> events = new CopyOnWriteArrayList<TestEvent>();
    private final List<Consumer<TestEvent>> listeners = new CopyOnWriteArrayList<Consumer<TestEvent>>();

    TestEventQueueImpl() {
    }

    @Override
    public void add(TestEvent e) {
        this.events.add(e);
        this.listeners.forEach(l -> l.accept(e));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void withHandler(TestEventQueue.TestEventHandler handler) throws Exception {
        LinkedBlockingQueue queue = new LinkedBlockingQueue();
        Consumer<TestEvent> listener = queue::add;
        this.addListener(listener);
        try {
            TestEventQueue.TestEventHandler.TestEventNextAction nextAction = TestEventQueue.TestEventHandler.TestEventNextAction.CONTINUE;
            while (nextAction == TestEventQueue.TestEventHandler.TestEventNextAction.CONTINUE) {
                nextAction = handler.handle((TestEvent)queue.take());
            }
        }
        finally {
            this.removeListener(listener);
        }
    }

    @Override
    public void removeListener(Consumer<TestEvent> listener) {
        this.listeners.remove(listener);
    }

    @Override
    public void addListener(Consumer<TestEvent> listener) {
        this.listeners.add(listener);
    }

    @Override
    public List<TestEvent> getAll() {
        return Collections.unmodifiableList(this.events);
    }
}

