/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.api;

import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.test.util.InfiniteIntegerInputFormat;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileReadingWatermarkITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(FileReadingWatermarkITCase.class);
    private static final int WATERMARK_INTERVAL_MILLIS = 1000;
    private static final int EXPECTED_WATERMARKS = 5;
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Test
    public void testWatermarkEmissionWithChaining() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        env.getConfig().setAutoWatermarkInterval(1000L);
        SharedReference latch = this.sharedObjects.add((Object)new CountDownLatch(5));
        Preconditions.checkState((boolean)env.isChainingEnabled());
        env.createInput((InputFormat)new InfiniteIntegerInputFormat(true)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((TimestampAssignerSupplier & Serializable)context -> FileReadingWatermarkITCase.getExtractorAssigner())).addSink(FileReadingWatermarkITCase.getWatermarkCounter((SharedReference<CountDownLatch>)latch));
        env.executeAsync();
        ((CountDownLatch)latch.get()).await();
    }

    private static TimestampAssigner<Integer> getExtractorAssigner() {
        return new TimestampAssigner<Integer>(){
            private long counter = 1L;

            public long extractTimestamp(Integer element, long recordTimestamp) {
                return this.counter++;
            }
        };
    }

    private static SinkFunction<Integer> getWatermarkCounter(final SharedReference<CountDownLatch> latch) {
        return new RichSinkFunction<Integer>(){

            public void invoke(Integer value, SinkFunction.Context context) {
                try {
                    Thread.sleep(1000L);
                    LOG.info("Sink received record");
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            public void writeWatermark(Watermark watermark) {
                LOG.info("Sink received watermark {}", (Object)watermark);
                ((CountDownLatch)latch.get()).countDown();
            }
        };
    }
}

