package org.apache.flink.runtime.operators.coordination;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.operators.coordination.TestEventSender;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.class */
public class OperatorEventValveTest {
    @Test
    public void eventsPassThroughOpenValve() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorEventValve operatorEventValve = new OperatorEventValve(testEventSender);
        TestOperatorEvent testOperatorEvent = new TestOperatorEvent();
        CompletableFuture sendEvent = operatorEventValve.sendEvent(new SerializedValue(testOperatorEvent), 11);
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(testOperatorEvent, 11)}));
        Assert.assertTrue(sendEvent.isDone());
    }

    @Test(expected = IllegalStateException.class)
    public void errorShuttingUnmarkedValve() throws Exception {
        new OperatorEventValve(new TestEventSender()).shutValve(123L);
    }

    @Test(expected = IllegalStateException.class)
    public void errorShuttingValveForOtherMark() throws Exception {
        OperatorEventValve operatorEventValve = new OperatorEventValve(new TestEventSender());
        operatorEventValve.markForCheckpoint(100L);
        operatorEventValve.shutValve(123L);
    }

    @Test
    public void eventsBlockedByClosedValve() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorEventValve operatorEventValve = new OperatorEventValve(testEventSender);
        operatorEventValve.markForCheckpoint(1L);
        operatorEventValve.shutValve(1L);
        CompletableFuture sendEvent = operatorEventValve.sendEvent(new SerializedValue(new TestOperatorEvent()), 1);
        Assert.assertTrue(testEventSender.events.isEmpty());
        Assert.assertFalse(sendEvent.isDone());
    }

    @Test
    public void eventsReleasedAfterOpeningValve() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorEventValve operatorEventValve = new OperatorEventValve(testEventSender);
        operatorEventValve.markForCheckpoint(17L);
        operatorEventValve.shutValve(17L);
        TestOperatorEvent testOperatorEvent = new TestOperatorEvent();
        TestOperatorEvent testOperatorEvent2 = new TestOperatorEvent();
        CompletableFuture sendEvent = operatorEventValve.sendEvent(new SerializedValue(testOperatorEvent), 3);
        CompletableFuture sendEvent2 = operatorEventValve.sendEvent(new SerializedValue(testOperatorEvent2), 0);
        operatorEventValve.openValveAndUnmarkCheckpoint();
        Assert.assertThat(testEventSender.events, Matchers.containsInAnyOrder(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(testOperatorEvent, 3), new TestEventSender.EventWithSubtask(testOperatorEvent2, 0)}));
        Assert.assertTrue(sendEvent.isDone());
        Assert.assertTrue(sendEvent2.isDone());
    }

    @Test
    public void releasedEventsForwardSendFailures() throws Exception {
        OperatorEventValve operatorEventValve = new OperatorEventValve(new TestEventSender(new FlinkException("test")));
        operatorEventValve.markForCheckpoint(17L);
        operatorEventValve.shutValve(17L);
        CompletableFuture sendEvent = operatorEventValve.sendEvent(new SerializedValue(new TestOperatorEvent()), 10);
        operatorEventValve.openValveAndUnmarkCheckpoint();
        Assert.assertTrue(sendEvent.isCompletedExceptionally());
    }

    @Test
    public void resetDropsAllEvents() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorEventValve operatorEventValve = new OperatorEventValve(testEventSender);
        operatorEventValve.markForCheckpoint(17L);
        operatorEventValve.shutValve(17L);
        operatorEventValve.sendEvent(new SerializedValue(new TestOperatorEvent()), 0);
        operatorEventValve.sendEvent(new SerializedValue(new TestOperatorEvent()), 1);
        operatorEventValve.reset();
        operatorEventValve.openValveAndUnmarkCheckpoint();
        Assert.assertTrue(testEventSender.events.isEmpty());
    }

    @Test
    public void resetForTaskDropsSelectiveEvents() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorEventValve operatorEventValve = new OperatorEventValve(testEventSender);
        operatorEventValve.markForCheckpoint(17L);
        operatorEventValve.shutValve(17L);
        TestOperatorEvent testOperatorEvent = new TestOperatorEvent();
        TestOperatorEvent testOperatorEvent2 = new TestOperatorEvent();
        CompletableFuture sendEvent = operatorEventValve.sendEvent(new SerializedValue(testOperatorEvent), 0);
        CompletableFuture sendEvent2 = operatorEventValve.sendEvent(new SerializedValue(testOperatorEvent2), 1);
        operatorEventValve.resetForTask(1);
        operatorEventValve.openValveAndUnmarkCheckpoint();
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(testOperatorEvent, 0)}));
        Assert.assertTrue(sendEvent.isDone());
        Assert.assertTrue(sendEvent2.isCompletedExceptionally());
    }

    @Test
    public void resetOpensValve() throws Exception {
        OperatorEventValve operatorEventValve = new OperatorEventValve(new TestEventSender());
        operatorEventValve.markForCheckpoint(17L);
        operatorEventValve.shutValve(17L);
        operatorEventValve.reset();
        Assert.assertFalse(operatorEventValve.isShut());
    }
}
