package org.apache.flink.test.streaming.runtime;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Semaphore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.TimerException;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.class */
public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
    private final TimeCharacteristic timeCharacteristic;

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase$InfiniteTestSource.class */
    private static class InfiniteTestSource implements SourceFunction<String> {
        private static final long serialVersionUID = 1;
        private volatile boolean running;

        private InfiniteTestSource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            while (this.running) {
                sourceContext.collect("hello");
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase$TimerOperator.class */
    private static class TimerOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, ProcessingTimeCallback {
        private static final long serialVersionUID = 1;
        int numTimers = 0;
        int numElements = 0;
        private boolean first = true;
        private Semaphore semaphore = new Semaphore(1);

        public TimerOperator(ChainingStrategy chainingStrategy) {
            setChainingStrategy(chainingStrategy);
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail("Concurrent invocation of operator functions.");
            }
            if (this.first) {
                getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this);
                this.first = false;
            }
            this.numElements++;
            this.semaphore.release();
        }

        public void onProcessingTime(long j) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail("Concurrent invocation of operator functions.");
            }
            try {
                this.numTimers++;
                throwIfDone();
                getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this);
            } finally {
                this.semaphore.release();
            }
        }

        private void throwIfDone() {
            if (this.numTimers > 1000 && this.numElements > 10000) {
                throw new RuntimeException("TEST SUCCESS");
            }
        }

        public void processWatermark(Watermark watermark) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail("Concurrent invocation of operator functions.");
            }
            this.semaphore.release();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase$TwoInputTimerOperator.class */
    private static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String>, ProcessingTimeCallback {
        private static final long serialVersionUID = 1;
        int numTimers = 0;
        int numElements = 0;
        private boolean first = true;
        private Semaphore semaphore = new Semaphore(1);

        public TwoInputTimerOperator(ChainingStrategy chainingStrategy) {
            setChainingStrategy(chainingStrategy);
        }

        public void processElement1(StreamRecord<String> streamRecord) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail("Concurrent invocation of operator functions.");
            }
            if (this.first) {
                getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this);
                this.first = false;
            }
            this.numElements++;
            this.semaphore.release();
        }

        public void processElement2(StreamRecord<String> streamRecord) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail("Concurrent invocation of operator functions.");
            }
            if (this.first) {
                getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this);
                this.first = false;
            }
            this.numElements++;
            this.semaphore.release();
        }

        public void onProcessingTime(long j) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail("Concurrent invocation of operator functions.");
            }
            try {
                this.numTimers++;
                throwIfDone();
                getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this);
            } finally {
                this.semaphore.release();
            }
        }

        private void throwIfDone() {
            if (this.numTimers > 1000 && this.numElements > 10000) {
                throw new RuntimeException("TEST SUCCESS");
            }
        }

        public void processWatermark1(Watermark watermark) throws Exception {
        }

        public void processWatermark2(Watermark watermark) throws Exception {
        }
    }

    public StreamTaskTimerITCase(TimeCharacteristic timeCharacteristic) {
        this.timeCharacteristic = timeCharacteristic;
    }

    @Test
    public void testOperatorChainedToSource() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
        executionEnvironment.setParallelism(1);
        executionEnvironment.addSource(new InfiniteTestSource()).transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.ALWAYS));
        boolean z = false;
        try {
            executionEnvironment.execute("Timer test");
        } catch (JobExecutionException e) {
            if (!(e.getCause() instanceof TimerException)) {
                throw e;
            }
            TimerException cause = e.getCause();
            if (!(cause.getCause() instanceof RuntimeException)) {
                throw e;
            }
            if (!((RuntimeException) cause.getCause()).getMessage().equals("TEST SUCCESS")) {
                throw e;
            }
            z = true;
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testOneInputOperatorWithoutChaining() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
        executionEnvironment.setParallelism(1);
        executionEnvironment.addSource(new InfiniteTestSource()).transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.NEVER));
        boolean z = false;
        try {
            executionEnvironment.execute("Timer test");
        } catch (JobExecutionException e) {
            if (!(e.getCause() instanceof TimerException)) {
                throw e;
            }
            TimerException cause = e.getCause();
            if (!(cause.getCause() instanceof RuntimeException)) {
                throw e;
            }
            if (!((RuntimeException) cause.getCause()).getMessage().equals("TEST SUCCESS")) {
                throw e;
            }
            z = true;
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testTwoInputOperatorWithoutChaining() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
        executionEnvironment.setParallelism(1);
        DataStreamSource addSource = executionEnvironment.addSource(new InfiniteTestSource());
        addSource.connect(addSource).transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TwoInputTimerOperator(ChainingStrategy.NEVER));
        boolean z = false;
        try {
            executionEnvironment.execute("Timer test");
        } catch (JobExecutionException e) {
            if (!(e.getCause() instanceof TimerException)) {
                throw e;
            }
            TimerException cause = e.getCause();
            if (!(cause.getCause() instanceof RuntimeException)) {
                throw e;
            }
            if (!((RuntimeException) cause.getCause()).getMessage().equals("TEST SUCCESS")) {
                throw e;
            }
            z = true;
        }
        Assert.assertTrue(z);
    }

    @Parameterized.Parameters(name = "Time Characteristic = {0}")
    public static Collection<Object[]> executionModes() {
        return Arrays.asList(new Object[]{TimeCharacteristic.ProcessingTime}, new Object[]{TimeCharacteristic.IngestionTime}, new Object[]{TimeCharacteristic.EventTime});
    }
}
