/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network;

import junit.framework.TestCase;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class TaskEventDispatcherTest
extends TestLogger {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Test
    public void registerPartitionTwice() throws Exception {
        ResultPartitionID partitionId = new ResultPartitionID();
        TaskEventDispatcher ted = new TaskEventDispatcher();
        ted.registerPartition(partitionId);
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("already registered at task event dispatcher");
        ted.registerPartition(partitionId);
    }

    @Test
    public void subscribeToEventNotRegistered() throws Exception {
        TaskEventDispatcher ted = new TaskEventDispatcher();
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("not registered at task event dispatcher");
        ted.subscribeToEvent(new ResultPartitionID(), (EventListener)new ZeroShotEventListener(), TaskEvent.class);
    }

    @Test
    public void publishSubscribe() throws Exception {
        ResultPartitionID partitionId1 = new ResultPartitionID();
        ResultPartitionID partitionId2 = new ResultPartitionID();
        TaskEventDispatcher ted = new TaskEventDispatcher();
        AllWorkersDoneEvent event1 = new AllWorkersDoneEvent();
        TerminationEvent event2 = new TerminationEvent();
        Assert.assertFalse((boolean)ted.publish(partitionId1, (TaskEvent)event1));
        ted.registerPartition(partitionId1);
        ted.registerPartition(partitionId2);
        TestCase.assertTrue((boolean)ted.publish(partitionId1, (TaskEvent)event1));
        OneShotEventListener eventListener1a = new OneShotEventListener((TaskEvent)event1);
        ZeroShotEventListener eventListener1b = new ZeroShotEventListener();
        ZeroShotEventListener eventListener2 = new ZeroShotEventListener();
        OneShotEventListener eventListener3 = new OneShotEventListener((TaskEvent)event2);
        ted.subscribeToEvent(partitionId1, (EventListener)eventListener1a, AllWorkersDoneEvent.class);
        ted.subscribeToEvent(partitionId2, (EventListener)eventListener1b, AllWorkersDoneEvent.class);
        ted.subscribeToEvent(partitionId1, (EventListener)eventListener2, TaskEvent.class);
        ted.subscribeToEvent(partitionId1, (EventListener)eventListener3, TerminationEvent.class);
        TestCase.assertTrue((boolean)ted.publish(partitionId1, (TaskEvent)event1));
        TestCase.assertTrue((String)"listener should have fired for AllWorkersDoneEvent", (boolean)eventListener1a.fired);
        Assert.assertFalse((String)"listener should not have fired for AllWorkersDoneEvent", (boolean)eventListener3.fired);
        TestCase.assertTrue((boolean)ted.publish(partitionId1, (TaskEvent)event2));
        TestCase.assertTrue((String)"listener should have fired for TerminationEvent", (boolean)eventListener3.fired);
    }

    @Test
    public void unregisterPartition() throws Exception {
        ResultPartitionID partitionId1 = new ResultPartitionID();
        ResultPartitionID partitionId2 = new ResultPartitionID();
        TaskEventDispatcher ted = new TaskEventDispatcher();
        AllWorkersDoneEvent event = new AllWorkersDoneEvent();
        Assert.assertFalse((boolean)ted.publish(partitionId1, (TaskEvent)event));
        ted.registerPartition(partitionId1);
        ted.registerPartition(partitionId2);
        OneShotEventListener eventListener1a = new OneShotEventListener((TaskEvent)event);
        ZeroShotEventListener eventListener1b = new ZeroShotEventListener();
        OneShotEventListener eventListener2 = new OneShotEventListener((TaskEvent)event);
        ted.subscribeToEvent(partitionId1, (EventListener)eventListener1a, AllWorkersDoneEvent.class);
        ted.subscribeToEvent(partitionId2, (EventListener)eventListener1b, AllWorkersDoneEvent.class);
        ted.subscribeToEvent(partitionId1, (EventListener)eventListener2, AllWorkersDoneEvent.class);
        ted.unregisterPartition(partitionId2);
        TestCase.assertTrue((boolean)ted.publish(partitionId1, (TaskEvent)event));
        TestCase.assertTrue((String)"listener should have fired for AllWorkersDoneEvent", (boolean)eventListener1a.fired);
        TestCase.assertTrue((String)"listener should have fired for AllWorkersDoneEvent", (boolean)eventListener2.fired);
        Assert.assertFalse((boolean)ted.publish(partitionId2, (TaskEvent)event));
    }

    @Test
    public void clearAll() throws Exception {
        ResultPartitionID partitionId = new ResultPartitionID();
        TaskEventDispatcher ted = new TaskEventDispatcher();
        ted.registerPartition(partitionId);
        ZeroShotEventListener eventListener1 = new ZeroShotEventListener();
        ted.subscribeToEvent(partitionId, (EventListener)eventListener1, AllWorkersDoneEvent.class);
        ted.clearAll();
        Assert.assertFalse((boolean)ted.publish(partitionId, (TaskEvent)new AllWorkersDoneEvent()));
    }

    private static class ZeroShotEventListener
    implements EventListener<TaskEvent> {
        private ZeroShotEventListener() {
        }

        public void onEvent(TaskEvent actual) {
            throw new IllegalStateException("Should never fire");
        }
    }

    private static class OneShotEventListener
    implements EventListener<TaskEvent> {
        private final TaskEvent expected;
        boolean fired = false;

        OneShotEventListener(TaskEvent expected) {
            this.expected = expected;
        }

        public void onEvent(TaskEvent actual) {
            Preconditions.checkState((!this.fired ? 1 : 0) != 0, (Object)"Should only fire once");
            this.fired = true;
            Preconditions.checkArgument((actual == this.expected ? 1 : 0) != 0, (String)"Fired on unexpected event: %s (expected: %s)", (Object[])new Object[]{actual, this.expected});
        }
    }
}

