package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.class */
public class BufferTimeoutITCase extends AbstractTestBaseJUnit4 {

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Test
    public void testDisablingBufferTimeout() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setBufferTimeout(-1L);
        final SharedReference add = this.sharedObjects.add(new ArrayList());
        executionEnvironment.addSource(new SourceFunction<Integer>() { // from class: org.apache.flink.test.streaming.runtime.BufferTimeoutITCase.2
            public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
                sourceContext.collect(1);
                Thread.sleep(Long.MAX_VALUE);
            }

            public void cancel() {
            }
        }).slotSharingGroup("source").addSink(new SinkFunction<Integer>() { // from class: org.apache.flink.test.streaming.runtime.BufferTimeoutITCase.1
            public void invoke(Integer num, SinkFunction.Context context) {
                ((ArrayList) add.get()).add(num);
            }
        }).slotSharingGroup("sink");
        CommonTestUtils.waitForAllTaskRunning(MINI_CLUSTER_RESOURCE.getMiniCluster(), executionEnvironment.executeAsync().getJobID(), false);
        Assert.assertTrue("OutputFlusher thread is unexpectedly running", Thread.getAllStackTraces().keySet().stream().noneMatch(thread -> {
            return thread.getName().startsWith("OutputFlusher");
        }));
    }
}
