/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.api.datastream;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

public class FinishedSourcesWatermarkITCase
extends TestLogger {
    private static final AtomicLong CHECKPOINT_10_WATERMARK = new AtomicLong(org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK.getTimestamp());
    private static final AtomicBoolean DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK = new AtomicBoolean();

    @Test
    public void testTwoConsecutiveFinishedTasksShouldPropagateMaxWatermark() throws Exception {
        Configuration conf = new Configuration();
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        env.disableOperatorChaining();
        env.enableCheckpointing(200L);
        DataStreamSource runningStreamIn = env.addSource((SourceFunction)new LongRunningSource(), "Long Running Source");
        DataStreamSource emptyStream = env.addSource((SourceFunction)new ShortLivedEmptySource(), "Short Lived Source");
        SingleOutputStreamOperator mappedEmptyStream = emptyStream.map((MapFunction & Serializable)v -> v).name("Empty Stream Map");
        runningStreamIn.connect((DataStream)mappedEmptyStream).process((CoProcessFunction)new NoopCoProcessFunction()).name("Join").addSink((SinkFunction)new SinkWaitingForWatermark());
        env.execute();
    }

    private static class NoopCoProcessFunction
    extends CoProcessFunction<String, String, String> {
        private NoopCoProcessFunction() {
        }

        public void processElement1(String val, CoProcessFunction.Context context, Collector<String> collector) {
        }

        public void processElement2(String val, CoProcessFunction.Context context, Collector<String> collector) {
        }
    }

    private static class ShortLivedEmptySource
    extends RichSourceFunction<String> {
        private ShortLivedEmptySource() {
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
        }

        public void cancel() {
        }
    }

    private static class LongRunningSource
    extends RichSourceFunction<String>
    implements CheckpointListener {
        private volatile boolean isRunning = true;
        private long lastEmittedWatermark;

        private LongRunningSource() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            while (this.isRunning && !DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK.get()) {
                Object object = sourceContext.getCheckpointLock();
                synchronized (object) {
                    this.lastEmittedWatermark = Math.max(System.currentTimeMillis(), this.lastEmittedWatermark);
                    sourceContext.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(this.lastEmittedWatermark));
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (checkpointId == 5L) {
                throw new RuntimeException("Force recovery");
            }
            if (checkpointId > 10L) {
                CHECKPOINT_10_WATERMARK.set(Math.min(this.lastEmittedWatermark, CHECKPOINT_10_WATERMARK.get()));
            }
        }
    }

    private static class SinkWaitingForWatermark
    implements SinkFunction<String> {
        private SinkWaitingForWatermark() {
        }

        public void writeWatermark(Watermark watermark) {
            if (watermark.getTimestamp() > CHECKPOINT_10_WATERMARK.get()) {
                DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK.set(true);
            }
        }
    }
}

