/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestStreamReadMonitoringFunction {
    private static final long WAIT_TIME_MILLIS = 5000L;
    private Configuration conf;
    @TempDir
    File tempFile;

    @BeforeEach
    public void before() throws Exception {
        String basePath = this.tempFile.getAbsolutePath();
        this.conf = TestConfigurations.getDefaultConf(basePath);
        this.conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        this.conf.setInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2);
        StreamerUtil.initTableIfNotExists((Configuration)this.conf);
    }

    @Test
    public void testConsumeFromLatestCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All the instants should have range limit");
            String latestCommit = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(latestCommit)), (String)"All the splits should be with latestCommit instant time");
            function.close();
        }
    }

    @Test
    public void testConsumeFromLastCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All instants should have range limit");
            Thread.sleep(1000L);
            latch = new CountDownLatch(4);
            sourceContext.reset(latch);
            TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All the instants should have range limit");
            function.close();
        }
    }

    @Test
    public void testConsumeFromSpecifiedCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String specifiedCommit = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.READ_START_COMMIT, specifiedCommit);
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All the instants should have range limit");
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(specifiedCommit)), (String)"All the splits should be with specified instant time");
            function.close();
        }
    }

    @Test
    public void testConsumeFromEarliestCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String specifiedCommit = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), (String)"No instants should have range limit");
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(specifiedCommit)), (String)"All the splits should be with specified instant time");
            function.close();
        }
    }

    @Test
    public void testCheckpointRestore() throws Exception {
        OperatorSubtaskState state;
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            Thread.sleep(1000L);
            state = harness.snapshot(1L, 1L);
            function.close();
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All instants should have range limit");
        }
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function2);){
            harness.setup();
            harness.initializeState(state);
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function2);
            function.close();
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All the instants should have range limit");
        }
    }

    private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness(StreamReadMonitoringFunction function) throws Exception {
        StreamSource streamSource = new StreamSource((SourceFunction)function);
        return new AbstractStreamOperatorTestHarness((StreamOperator)streamSource, 1, 1, 0);
    }

    private void runAsync(CollectingSourceContext sourceContext, StreamReadMonitoringFunction function) {
        Thread task = new Thread(() -> {
            try {
                function.run((SourceFunction.SourceContext)sourceContext);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        task.start();
    }

    private static class CollectingSourceContext
    implements SourceFunction.SourceContext<MergeOnReadInputSplit> {
        private final List<MergeOnReadInputSplit> splits = new ArrayList<MergeOnReadInputSplit>();
        private final Object checkpointLock = new Object();
        private volatile CountDownLatch latch;

        CollectingSourceContext(CountDownLatch latch) {
            this.latch = latch;
        }

        public void collect(MergeOnReadInputSplit element) {
            this.splits.add(element);
            this.latch.countDown();
        }

        public void collectWithTimestamp(MergeOnReadInputSplit element, long timestamp) {
            this.collect(element);
        }

        public void emitWatermark(Watermark mark) {
        }

        public void markAsTemporarilyIdle() {
        }

        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        public void close() {
        }

        public void reset(CountDownLatch latch) {
            this.latch = latch;
            this.splits.clear();
        }

        public String getPartitionPaths() {
            return this.splits.stream().map(TestUtils::getSplitPartitionPath).sorted(Comparator.naturalOrder()).collect(Collectors.joining(","));
        }
    }
}

