package org.apache.flink.test.streaming.api;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.UUID;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.class */
public class FileReadingWatermarkITCase {
    private static final String NUM_WATERMARKS_ACC_NAME = "numWatermarks";
    private static final int FILE_SIZE_LINES = 100000;
    private static final int WATERMARK_INTERVAL_MILLIS = 10;
    private static final int MIN_EXPECTED_WATERMARKS = 5;

    @Test
    public void testWatermarkEmissionWithChaining() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.getConfig().setAutoWatermarkInterval(10L);
        Preconditions.checkState(createLocalEnvironment.isChainingEnabled());
        createLocalEnvironment.readTextFile(getSourceFile().getAbsolutePath()).assignTimestampsAndWatermarks(getExtractorAssigner()).addSink(getWatermarkCounter());
        int intValue = ((Integer) createLocalEnvironment.execute().getAccumulatorResult(NUM_WATERMARKS_ACC_NAME)).intValue();
        Assert.assertTrue("too few watermarks emitted: " + intValue, intValue >= MIN_EXPECTED_WATERMARKS);
    }

    private File getSourceFile() throws IOException {
        File createTempFile = File.createTempFile(UUID.randomUUID().toString(), null);
        PrintWriter printWriter = new PrintWriter(createTempFile);
        Throwable th = null;
        for (int i = 0; i < FILE_SIZE_LINES; i++) {
            try {
                try {
                    printWriter.println(i);
                } finally {
                }
            } catch (Throwable th2) {
                if (printWriter != null) {
                    if (th != null) {
                        try {
                            printWriter.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        printWriter.close();
                    }
                }
                throw th2;
            }
        }
        if (printWriter != null) {
            if (0 != 0) {
                try {
                    printWriter.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                printWriter.close();
            }
        }
        createTempFile.deleteOnExit();
        return createTempFile;
    }

    private static BoundedOutOfOrdernessTimestampExtractor<String> getExtractorAssigner() {
        return new BoundedOutOfOrdernessTimestampExtractor<String>(Time.hours(1L)) { // from class: org.apache.flink.test.streaming.api.FileReadingWatermarkITCase.1
            private final long started = System.currentTimeMillis();

            public long extractTimestamp(String str) {
                return this.started + Long.parseLong(str);
            }
        };
    }

    private static SinkFunction<String> getWatermarkCounter() {
        return new RichSinkFunction<String>() { // from class: org.apache.flink.test.streaming.api.FileReadingWatermarkITCase.2
            private final IntCounter numWatermarks = new IntCounter();
            private long lastWatermark = -1;

            public void open(Configuration configuration) throws Exception {
                super.open(configuration);
                getRuntimeContext().addAccumulator(FileReadingWatermarkITCase.NUM_WATERMARKS_ACC_NAME, this.numWatermarks);
            }

            public void close() throws Exception {
                super.close();
            }

            public void invoke(String str, SinkFunction.Context context) {
                if (context.currentWatermark() != this.lastWatermark) {
                    this.lastWatermark = context.currentWatermark();
                    this.numWatermarks.add(1);
                }
            }
        };
    }
}
