package org.apache.flink.runtime.io.network;

import junit.framework.TestCase;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/runtime/io/network/TaskEventDispatcherTest.class */
public class TaskEventDispatcherTest extends TestLogger {

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/apache/flink/runtime/io/network/TaskEventDispatcherTest$OneShotEventListener.class */
    private static class OneShotEventListener implements EventListener<TaskEvent> {
        private final TaskEvent expected;
        boolean fired = false;

        OneShotEventListener(TaskEvent taskEvent) {
            this.expected = taskEvent;
        }

        public void onEvent(TaskEvent taskEvent) {
            Preconditions.checkState(!this.fired, "Should only fire once");
            this.fired = true;
            Preconditions.checkArgument(taskEvent == this.expected, "Fired on unexpected event: %s (expected: %s)", new Object[]{taskEvent, this.expected});
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/TaskEventDispatcherTest$ZeroShotEventListener.class */
    private static class ZeroShotEventListener implements EventListener<TaskEvent> {
        private ZeroShotEventListener() {
        }

        public void onEvent(TaskEvent taskEvent) {
            throw new IllegalStateException("Should never fire");
        }
    }

    @Test
    public void registerPartitionTwice() throws Exception {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        taskEventDispatcher.registerPartition(resultPartitionID);
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("already registered at task event dispatcher");
        taskEventDispatcher.registerPartition(resultPartitionID);
    }

    @Test
    public void subscribeToEventNotRegistered() throws Exception {
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("not registered at task event dispatcher");
        taskEventDispatcher.subscribeToEvent(new ResultPartitionID(), new ZeroShotEventListener(), TaskEvent.class);
    }

    @Test
    public void publishSubscribe() throws Exception {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        AllWorkersDoneEvent allWorkersDoneEvent = new AllWorkersDoneEvent();
        TerminationEvent terminationEvent = new TerminationEvent();
        Assert.assertFalse(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent));
        taskEventDispatcher.registerPartition(resultPartitionID);
        taskEventDispatcher.registerPartition(resultPartitionID2);
        TestCase.assertTrue(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent));
        OneShotEventListener oneShotEventListener = new OneShotEventListener(allWorkersDoneEvent);
        ZeroShotEventListener zeroShotEventListener = new ZeroShotEventListener();
        ZeroShotEventListener zeroShotEventListener2 = new ZeroShotEventListener();
        OneShotEventListener oneShotEventListener2 = new OneShotEventListener(terminationEvent);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, oneShotEventListener, AllWorkersDoneEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID2, zeroShotEventListener, AllWorkersDoneEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, zeroShotEventListener2, TaskEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, oneShotEventListener2, TerminationEvent.class);
        TestCase.assertTrue(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent));
        TestCase.assertTrue("listener should have fired for AllWorkersDoneEvent", oneShotEventListener.fired);
        Assert.assertFalse("listener should not have fired for AllWorkersDoneEvent", oneShotEventListener2.fired);
        TestCase.assertTrue(taskEventDispatcher.publish(resultPartitionID, terminationEvent));
        TestCase.assertTrue("listener should have fired for TerminationEvent", oneShotEventListener2.fired);
    }

    @Test
    public void unregisterPartition() throws Exception {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        AllWorkersDoneEvent allWorkersDoneEvent = new AllWorkersDoneEvent();
        Assert.assertFalse(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent));
        taskEventDispatcher.registerPartition(resultPartitionID);
        taskEventDispatcher.registerPartition(resultPartitionID2);
        OneShotEventListener oneShotEventListener = new OneShotEventListener(allWorkersDoneEvent);
        ZeroShotEventListener zeroShotEventListener = new ZeroShotEventListener();
        OneShotEventListener oneShotEventListener2 = new OneShotEventListener(allWorkersDoneEvent);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, oneShotEventListener, AllWorkersDoneEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID2, zeroShotEventListener, AllWorkersDoneEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, oneShotEventListener2, AllWorkersDoneEvent.class);
        taskEventDispatcher.unregisterPartition(resultPartitionID2);
        TestCase.assertTrue(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent));
        TestCase.assertTrue("listener should have fired for AllWorkersDoneEvent", oneShotEventListener.fired);
        TestCase.assertTrue("listener should have fired for AllWorkersDoneEvent", oneShotEventListener2.fired);
        Assert.assertFalse(taskEventDispatcher.publish(resultPartitionID2, allWorkersDoneEvent));
    }

    @Test
    public void clearAll() throws Exception {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        taskEventDispatcher.registerPartition(resultPartitionID);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, new ZeroShotEventListener(), AllWorkersDoneEvent.class);
        taskEventDispatcher.clearAll();
        Assert.assertFalse(taskEventDispatcher.publish(resultPartitionID, new AllWorkersDoneEvent()));
    }
}
