package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.class */
public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase extends CoordinatorEventsExactlyOnceITCase {
    private static final int NUM_EVENTS = 100;
    private static final int DELAY = 10;
    private StreamExecutionEnvironment env;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase$EventReceivingOperator.class */
    private static class EventReceivingOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T>, OperatorEventHandler {
        protected static final String ACCUMULATOR_NAME = "receivedIntegers";
        private static boolean shouldUnblockAllCheckpoint;
        private static boolean shouldUnblockNextCheckpoint;
        protected final ListAccumulator<Integer> accumulator;
        protected ListState<Integer> state;

        private EventReceivingOperator() {
            this.accumulator = new ListAccumulator<>();
        }

        public void open() throws Exception {
            super.open();
            getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, this.accumulator);
        }

        public void processElement(StreamRecord<T> streamRecord) throws Exception {
            throw new UnsupportedOperationException();
        }

        public void handleOperatorEvent(OperatorEvent operatorEvent) {
            if (operatorEvent instanceof CoordinatorEventsExactlyOnceITCase.IntegerEvent) {
                this.accumulator.add(Integer.valueOf(((CoordinatorEventsExactlyOnceITCase.IntegerEvent) operatorEvent).value));
            } else {
                if (!(operatorEvent instanceof CoordinatorEventsExactlyOnceITCase.EndEvent)) {
                    throw new UnsupportedOperationException();
                }
                try {
                    this.state.update(this.accumulator.getLocalValue());
                    boolean unused = ManuallyClosedSourceFunction.shouldCloseSource = true;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            super.snapshotState(stateSnapshotContext);
            while (!shouldUnblockAllCheckpoint && !shouldUnblockNextCheckpoint) {
                Thread.sleep(100L);
            }
            if (shouldUnblockNextCheckpoint) {
                shouldUnblockNextCheckpoint = false;
            }
            this.state.update(this.accumulator.getLocalValue());
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.state = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("accumulatorState", BasicTypeInfo.INT_TYPE_INFO));
            this.accumulator.resetLocal();
            Iterable iterable = (Iterable) this.state.get();
            ListAccumulator<Integer> listAccumulator = this.accumulator;
            listAccumulator.getClass();
            iterable.forEach((v1) -> {
                r1.add(v1);
            });
            sendStartEvent();
        }

        protected void sendStartEvent() throws IOException {
            getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(getOperatorID(), new SerializedValue(new CoordinatorEventsExactlyOnceITCase.StartEvent(-1)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase$EventReceivingOperatorFactory.class */
    public static class EventReceivingOperatorFactory<IN, OUT> extends AbstractStreamOperatorFactory<OUT> implements CoordinatedOperatorFactory<OUT>, OneInputStreamOperatorFactory<IN, OUT> {
        protected final String name;
        protected final int numEvents;
        protected final int delay;

        public EventReceivingOperatorFactory(String str, int i, int i2) {
            this.name = str;
            this.numEvents = i;
            this.delay = i2;
        }

        public OperatorCoordinator.Provider getCoordinatorProvider(String str, final OperatorID operatorID) {
            return new OperatorCoordinator.Provider() { // from class: org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.EventReceivingOperatorFactory.1
                public OperatorID getOperatorId() {
                    return operatorID;
                }

                public OperatorCoordinator create(OperatorCoordinator.Context context) {
                    return new EventSendingCoordinatorWithGuaranteedCheckpoint(context, EventReceivingOperatorFactory.this.name, EventReceivingOperatorFactory.this.numEvents, EventReceivingOperatorFactory.this.delay);
                }
            };
        }

        public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> streamOperatorParameters) {
            EventReceivingOperator eventReceivingOperator = new EventReceivingOperator();
            eventReceivingOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            streamOperatorParameters.getOperatorEventDispatcher().registerEventHandler(streamOperatorParameters.getStreamConfig().getOperatorID(), eventReceivingOperator);
            return eventReceivingOperator;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return EventReceivingOperator.class;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase$EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint.class */
    private static class EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint<IN, OUT> extends EventReceivingOperatorFactory<IN, OUT> {
        public EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint(String str, int i, int i2) {
            super(str, i, i2);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.EventReceivingOperatorFactory
        public OperatorCoordinator.Provider getCoordinatorProvider(String str, final OperatorID operatorID) {
            return new OperatorCoordinator.Provider() { // from class: org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint.1
                public OperatorID getOperatorId() {
                    return operatorID;
                }

                public OperatorCoordinator create(OperatorCoordinator.Context context) {
                    return new EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint(context, EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint.this.name, EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint.this.numEvents, EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint.this.delay);
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase$EventReceivingOperatorWithFailure.class */
    private static class EventReceivingOperatorWithFailure<T> extends EventReceivingOperator<T> {
        private final String name;
        private final int maxNumberBeforeFailure;
        private CoordinatorEventsExactlyOnceITCase.TestScript testScript;

        private EventReceivingOperatorWithFailure(String str, int i) {
            super();
            this.name = str;
            this.maxNumberBeforeFailure = (i / 3) + new Random().nextInt(i / 6);
        }

        public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<T>> output) {
            super.setup(streamTask, streamConfig, output);
            Preconditions.checkState(streamTask.getIndexInSubtaskGroup() == 0);
            this.testScript = CoordinatorEventsExactlyOnceITCase.TestScript.getForOperator(this.name + "-subtask0");
        }

        @Override // org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.EventReceivingOperator
        public void handleOperatorEvent(OperatorEvent operatorEvent) {
            if (operatorEvent instanceof CoordinatorEventsExactlyOnceITCase.IntegerEvent) {
                if (((CoordinatorEventsExactlyOnceITCase.IntegerEvent) operatorEvent).value <= this.maxNumberBeforeFailure || this.testScript.hasAlreadyFailed()) {
                    this.accumulator.add(Integer.valueOf(((CoordinatorEventsExactlyOnceITCase.IntegerEvent) operatorEvent).value));
                    return;
                } else {
                    this.testScript.recordHasFailed();
                    throw new RuntimeException();
                }
            }
            if (!(operatorEvent instanceof CoordinatorEventsExactlyOnceITCase.EndEvent)) {
                throw new UnsupportedOperationException();
            }
            try {
                this.state.update(this.accumulator.getLocalValue());
                if (this.testScript.hasAlreadyFailed()) {
                    boolean unused = ManuallyClosedSourceFunction.shouldCloseSource = true;
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.EventReceivingOperator
        protected void sendStartEvent() throws IOException {
            int i = -1;
            ArrayList localValue = this.accumulator.getLocalValue();
            if (!localValue.isEmpty()) {
                i = ((Integer) localValue.get(localValue.size() - 1)).intValue();
            }
            getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(getOperatorID(), new SerializedValue(new CoordinatorEventsExactlyOnceITCase.StartEvent(i)));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase$EventReceivingOperatorWithFailureFactory.class */
    private static class EventReceivingOperatorWithFailureFactory<IN, OUT> extends EventReceivingOperatorFactory<IN, OUT> {
        public EventReceivingOperatorWithFailureFactory(String str, int i, int i2) {
            super(str, i, i2);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.EventReceivingOperatorFactory
        public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> streamOperatorParameters) {
            EventReceivingOperatorWithFailure eventReceivingOperatorWithFailure = new EventReceivingOperatorWithFailure(this.name, this.numEvents);
            eventReceivingOperatorWithFailure.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            streamOperatorParameters.getOperatorEventDispatcher().registerEventHandler(streamOperatorParameters.getStreamConfig().getOperatorID(), eventReceivingOperatorWithFailure);
            return eventReceivingOperatorWithFailure;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase$EventSendingCoordinatorWithGuaranteedCheckpoint.class */
    private static class EventSendingCoordinatorWithGuaranteedCheckpoint extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator {
        private final int maxNumberBeforeFirstCheckpoint;
        private boolean isEventSentAfterFirstCheckpoint;
        private boolean isCoordinatorFirstCheckpointCompleted;
        private boolean isJobFirstCheckpointCompleted;

        public EventSendingCoordinatorWithGuaranteedCheckpoint(OperatorCoordinator.Context context, String str, int i, int i2) {
            super(context, str, i, i2);
            this.maxNumberBeforeFirstCheckpoint = new Random().nextInt(i / 6);
            this.isEventSentAfterFirstCheckpoint = false;
            this.isCoordinatorFirstCheckpointCompleted = false;
            this.isJobFirstCheckpointCompleted = false;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator
        public void sendNextEvent() {
            if (this.isCoordinatorFirstCheckpointCompleted || this.nextNumber <= this.maxNumberBeforeFirstCheckpoint) {
                if (this.isJobFirstCheckpointCompleted || this.nextNumber < this.maxNumberBeforeFailure) {
                    super.sendNextEvent();
                    if (this.isEventSentAfterFirstCheckpoint || !this.isCoordinatorFirstCheckpointCompleted) {
                        return;
                    }
                    this.isEventSentAfterFirstCheckpoint = true;
                    boolean unused = EventReceivingOperator.shouldUnblockAllCheckpoint = true;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator
        public void handleCheckpoint() {
            if (this.nextToComplete != null) {
                this.isCoordinatorFirstCheckpointCompleted = true;
            }
            super.handleCheckpoint();
            if (this.nextToComplete == null || !this.isEventSentAfterFirstCheckpoint || this.testScript.hasAlreadyFailed()) {
                return;
            }
            this.testScript.recordHasFailed();
            this.context.failJob(new Exception("test failure"));
        }

        @Override // org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator
        public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
            super.resetToCheckpoint(j, bArr);
            runInMailbox(() -> {
                this.isCoordinatorFirstCheckpointCompleted = true;
                this.isJobFirstCheckpointCompleted = true;
            });
        }

        public void notifyCheckpointAborted(long j) {
            super.notifyCheckpointAborted(j);
            runInMailbox(() -> {
                if (this.isJobFirstCheckpointCompleted) {
                    return;
                }
                this.isCoordinatorFirstCheckpointCompleted = false;
            });
        }

        @Override // org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator
        public void notifyCheckpointComplete(long j) {
            super.notifyCheckpointComplete(j);
            runInMailbox(() -> {
                this.isJobFirstCheckpointCompleted = true;
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase$EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint.class */
    private static class EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator {
        private static boolean isCheckpointAbortedBeforeScriptFailure;
        private final int maxNumberBeforeSecondCheckpoint;
        private boolean isEventSentAfterSecondCheckpoint;
        private boolean isCoordinatorFirstCheckpointCompleted;
        private boolean isJobFirstCheckpointCompleted;
        private boolean isCoordinatorSecondCheckpointCompleted;
        private boolean isJobSecondCheckpointCompleted;

        public EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint(OperatorCoordinator.Context context, String str, int i, int i2) {
            super(context, str, i, i2);
            this.maxNumberBeforeSecondCheckpoint = new Random().nextInt(i / 6);
            this.isEventSentAfterSecondCheckpoint = false;
            this.isCoordinatorFirstCheckpointCompleted = false;
            this.isJobFirstCheckpointCompleted = false;
            this.isCoordinatorSecondCheckpointCompleted = false;
            this.isJobSecondCheckpointCompleted = false;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator
        public void sendNextEvent() {
            if (this.isCoordinatorSecondCheckpointCompleted || this.nextNumber <= this.maxNumberBeforeSecondCheckpoint) {
                if (this.isJobSecondCheckpointCompleted || this.nextNumber < this.maxNumberBeforeFailure) {
                    super.sendNextEvent();
                    if (!this.isEventSentAfterSecondCheckpoint && this.isCoordinatorSecondCheckpointCompleted) {
                        this.isEventSentAfterSecondCheckpoint = true;
                    }
                    if (this.isEventSentAfterSecondCheckpoint && this.isJobFirstCheckpointCompleted) {
                        boolean unused = EventReceivingOperator.shouldUnblockAllCheckpoint = true;
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator
        public void handleCheckpoint() {
            if (this.nextToComplete != null) {
                if (!this.isCoordinatorFirstCheckpointCompleted) {
                    this.isCoordinatorFirstCheckpointCompleted = true;
                } else if (!this.isCoordinatorSecondCheckpointCompleted) {
                    this.isCoordinatorSecondCheckpointCompleted = true;
                    boolean unused = EventReceivingOperator.shouldUnblockNextCheckpoint = true;
                }
            }
            super.handleCheckpoint();
            if (this.nextToComplete == null || !this.isEventSentAfterSecondCheckpoint || this.testScript.hasAlreadyFailed()) {
                return;
            }
            this.testScript.recordHasFailed();
            this.context.failJob(new Exception("test failure"));
        }

        @Override // org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator
        public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
            super.resetToCheckpoint(j, bArr);
            runInMailbox(() -> {
                this.isCoordinatorFirstCheckpointCompleted = true;
                this.isJobFirstCheckpointCompleted = true;
            });
        }

        public void notifyCheckpointAborted(long j) {
            if (this.testScript.hasAlreadyFailed()) {
                return;
            }
            isCheckpointAbortedBeforeScriptFailure = true;
        }

        @Override // org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator
        public void notifyCheckpointComplete(long j) {
            super.notifyCheckpointComplete(j);
            runInMailbox(() -> {
                if (!this.isJobFirstCheckpointCompleted) {
                    this.isJobFirstCheckpointCompleted = true;
                } else {
                    if (this.isJobSecondCheckpointCompleted) {
                        return;
                    }
                    this.isJobSecondCheckpointCompleted = true;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase$ManuallyClosedSourceFunction.class */
    public static class ManuallyClosedSourceFunction<T> implements SourceFunction<T> {
        private static boolean shouldCloseSource;

        private ManuallyClosedSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
            while (!shouldCloseSource) {
                Thread.sleep(100L);
            }
        }

        public void cancel() {
        }
    }

    @Before
    public void setup() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(1);
        this.env.enableCheckpointing(100L);
        boolean unused = ManuallyClosedSourceFunction.shouldCloseSource = false;
        boolean unused2 = EventReceivingOperator.shouldUnblockAllCheckpoint = false;
        boolean unused3 = EventReceivingOperator.shouldUnblockNextCheckpoint = false;
        boolean unused4 = EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint.isCheckpointAbortedBeforeScriptFailure = false;
        CoordinatorEventsExactlyOnceITCase.TestScript.reset();
    }

    @Test
    public void testCheckpoint() throws Exception {
        executeAndVerifyResults(this.env, new EventReceivingOperatorFactory<>("eventReceiving", NUM_EVENTS, 10));
    }

    @Test
    public void testUnalignedCheckpoint() throws Exception {
        this.env.getCheckpointConfig().enableUnalignedCheckpoints();
        executeAndVerifyResults(this.env, new EventReceivingOperatorFactory<>("eventReceiving", NUM_EVENTS, 10));
    }

    @Test
    public void testCheckpointWithSubtaskFailure() throws Exception {
        executeAndVerifyResults(this.env, new EventReceivingOperatorWithFailureFactory("eventReceivingWithFailure", NUM_EVENTS, 10));
        Assertions.assertThat(CoordinatorEventsExactlyOnceITCase.TestScript.getForOperator("eventReceivingWithFailure-subtask0").hasAlreadyFailed()).isTrue();
    }

    @Test
    public void testUnalignedCheckpointWithSubtaskFailure() throws Exception {
        this.env.getCheckpointConfig().enableUnalignedCheckpoints();
        executeAndVerifyResults(this.env, new EventReceivingOperatorWithFailureFactory("eventReceivingWithFailure", NUM_EVENTS, 10));
        Assertions.assertThat(CoordinatorEventsExactlyOnceITCase.TestScript.getForOperator("eventReceivingWithFailure-subtask0").hasAlreadyFailed()).isTrue();
    }

    @Test
    public void testConcurrentCheckpoint() throws Exception {
        this.env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        executeAndVerifyResults(this.env, new EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint("eventReceiving", NUM_EVENTS, 10));
        Assertions.assertThat(EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint.isCheckpointAbortedBeforeScriptFailure).isFalse();
    }

    private void executeAndVerifyResults(StreamExecutionEnvironment streamExecutionEnvironment, EventReceivingOperatorFactory<Long, Long> eventReceivingOperatorFactory) throws Exception {
        streamExecutionEnvironment.addSource(new ManuallyClosedSourceFunction(), TypeInformation.of(Long.class)).disableChaining().transform(eventReceivingOperatorFactory.name, TypeInformation.of(Long.class), eventReceivingOperatorFactory).addSink(new DiscardingSink());
        checkListContainsSequence((List) MINI_CLUSTER.getMiniCluster().executeJobBlocking(streamExecutionEnvironment.getStreamGraph().getJobGraph()).getAccumulatorResult("receivedIntegers"), NUM_EVENTS);
    }
}
