/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.Optional;
import java.util.concurrent.Semaphore;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TimerException;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

public class StreamTaskTimerITCase
extends AbstractTestBaseJUnit4 {
    @Test
    public void testOperatorChainedToSource() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource source = env.addSource((SourceFunction)new InfiniteTestSource());
        source.transform("Custom Operator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)new TimerOperator());
        try {
            env.execute("Timer test");
        }
        catch (JobExecutionException e) {
            this.verifyJobExecutionException(e);
        }
    }

    private void verifyJobExecutionException(JobExecutionException e) throws JobExecutionException {
        Optional optionalTimerException = ExceptionUtils.findThrowable((Throwable)e, TimerException.class);
        Assert.assertTrue((boolean)optionalTimerException.isPresent());
        TimerException te = (TimerException)optionalTimerException.get();
        if (te.getCause() instanceof RuntimeException) {
            RuntimeException re = (RuntimeException)te.getCause();
            if (!re.getMessage().equals("TEST SUCCESS")) {
                throw e;
            }
        } else {
            throw e;
        }
    }

    @Test
    public void testOneInputOperatorWithoutChaining() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource source = env.addSource((SourceFunction)new InfiniteTestSource());
        source.transform("Custom Operator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)new TimerOperator());
        try {
            env.execute("Timer test");
        }
        catch (JobExecutionException e) {
            this.verifyJobExecutionException(e);
        }
    }

    @Test
    public void testTwoInputOperatorWithoutChaining() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource source = env.addSource((SourceFunction)new InfiniteTestSource());
        source.connect((DataStream)source).transform("Custom Operator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TwoInputStreamOperator)new TwoInputTimerOperator());
        try {
            env.execute("Timer test");
        }
        catch (JobExecutionException e) {
            this.verifyJobExecutionException(e);
        }
    }

    private static class InfiniteTestSource
    implements SourceFunction<String> {
        private static final long serialVersionUID = 1L;
        private volatile boolean running = true;

        private InfiniteTestSource() {
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.running) {
                ctx.collect((Object)"hello");
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    private static class TwoInputTimerOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<String, String, String>,
    ProcessingTimeService.ProcessingTimeCallback {
        private static final long serialVersionUID = 1L;
        int numTimers = 0;
        int numElements = 0;
        private boolean first = true;
        private Semaphore semaphore = new Semaphore(1);

        public void processElement1(StreamRecord<String> element) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            if (this.first) {
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100L, (ProcessingTimeService.ProcessingTimeCallback)this);
                this.first = false;
            }
            ++this.numElements;
            this.semaphore.release();
        }

        public void processElement2(StreamRecord<String> element) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            if (this.first) {
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100L, (ProcessingTimeService.ProcessingTimeCallback)this);
                this.first = false;
            }
            ++this.numElements;
            this.semaphore.release();
        }

        public void onProcessingTime(long time) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            try {
                ++this.numTimers;
                this.throwIfDone();
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1L, (ProcessingTimeService.ProcessingTimeCallback)this);
            }
            finally {
                this.semaphore.release();
            }
        }

        private void throwIfDone() {
            if (this.numTimers > 1000 && this.numElements > 10000) {
                throw new RuntimeException("TEST SUCCESS");
            }
        }

        public void processWatermark1(Watermark mark) throws Exception {
        }

        public void processWatermark2(Watermark mark) throws Exception {
        }
    }

    private static class TimerOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String>,
    ProcessingTimeService.ProcessingTimeCallback {
        private static final long serialVersionUID = 1L;
        int numTimers = 0;
        int numElements = 0;
        private boolean first = true;
        private Semaphore semaphore = new Semaphore(1);

        public void processElement(StreamRecord<String> element) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            if (this.first) {
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100L, (ProcessingTimeService.ProcessingTimeCallback)this);
                this.first = false;
            }
            ++this.numElements;
            this.semaphore.release();
        }

        public void onProcessingTime(long time) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            try {
                ++this.numTimers;
                this.throwIfDone();
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1L, (ProcessingTimeService.ProcessingTimeCallback)this);
            }
            finally {
                this.semaphore.release();
            }
        }

        private void throwIfDone() {
            if (this.numTimers > 1000 && this.numElements > 10000) {
                throw new RuntimeException("TEST SUCCESS");
            }
        }

        public void processWatermark(Watermark mark) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            this.semaphore.release();
        }
    }
}

