/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestStreamReadMonitoringFunction {
    private static final long WAIT_TIME_MILLIS = 5000L;
    private Configuration conf;
    @TempDir
    File tempFile;

    @BeforeEach
    public void before() throws Exception {
        String basePath = this.tempFile.getAbsolutePath();
        this.conf = TestConfigurations.getDefaultConf(basePath);
        this.conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        this.conf.setInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2);
        StreamerUtil.initTableIfNotExists((Configuration)this.conf);
    }

    @Test
    public void testConsumeFromLatestCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All the instants should have range limit");
            String latestCommit = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(latestCommit)), (String)"All the splits should be with latestCommit instant time");
            function.close();
        }
    }

    @Test
    public void testConsumeFromLastCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All instants should have range limit");
            Thread.sleep(1000L);
            latch = new CountDownLatch(4);
            sourceContext.reset(latch);
            TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All the instants should have range limit");
            function.close();
        }
    }

    @Test
    public void testConsumeForSpeedLimitWhenEmptyCommitExists() throws Exception {
        Configuration conf = new Configuration(this.conf);
        conf.set(FlinkOptions.TABLE_TYPE, (Object)FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
        conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), true);
        TestData.writeData(Collections.EMPTY_LIST, conf);
        TestData.writeData(Collections.EMPTY_LIST, conf);
        TestData.writeData(Collections.EMPTY_LIST, conf);
        TestData.writeData(Collections.EMPTY_LIST, conf);
        HoodieTableMetaClient metaClient = HoodieTestUtils.init((String)((String)conf.get(FlinkOptions.PATH)), (HoodieTableType)HoodieTableType.COPY_ON_WRITE);
        HoodieTimeline commitsTimeline = metaClient.reloadActiveTimeline().filter(hoodieInstant -> hoodieInstant.getAction().equals("commit"));
        HoodieInstant firstInstant = (HoodieInstant)commitsTimeline.firstInstant().get();
        conf.set(FlinkOptions.READ_AS_STREAMING, (Object)true);
        conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, (Object)true);
        conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, (Object)true);
        conf.set(FlinkOptions.READ_COMMITS_LIMIT, (Object)2);
        conf.set(FlinkOptions.READ_START_COMMIT, (Object)String.valueOf(Long.valueOf(firstInstant.requestedTime()) - 100L));
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(0);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            function.monitorDirAndForwardSplits((SourceFunction.SourceContext)sourceContext);
            Assertions.assertEquals((int)0, (int)sourceContext.splits.size(), (String)"There should be no inputSplits");
            Assertions.assertNotNull((Object)function.getIssuedOffset());
            function.close();
        }
    }

    @Test
    public void testConsumeFromSpecifiedCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String specifiedCommit = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.READ_START_COMMIT, specifiedCommit);
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All the instants should have range limit");
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(specifiedCommit)), (String)"All the splits should be with specified instant time");
            function.close();
        }
    }

    @Test
    public void testConsumeFromEarliestCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String specifiedCommit = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), (String)"No instants should have range limit");
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(specifiedCommit)), (String)"All the splits should be with specified instant time");
            function.close();
        }
    }

    @Test
    public void testConsumingHollowInstants() throws Exception {
        this.conf.setString("hoodie.parquet.small.file.limit", "0");
        for (int i = 0; i < 8; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, this.conf);
        }
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
        List oriInstants = metaClient.getCommitsTimeline().filterCompletedInstants().getInstants();
        MatcherAssert.assertThat((Object)oriInstants.size(), (Matcher)CoreMatchers.is((Object)4));
        ArrayList<HoodieCommitMetadata> metadataList = new ArrayList<HoodieCommitMetadata>();
        for (int i = 1; i <= 2; ++i) {
            HoodieInstant instant = (HoodieInstant)oriInstants.get(i);
            metadataList.add(TestUtils.deleteInstantFile(metaClient, instant));
        }
        List instants = metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
        MatcherAssert.assertThat((Object)instants.size(), (Matcher)CoreMatchers.is((Object)2));
        String c2 = ((HoodieInstant)oriInstants.get(1)).requestedTime();
        String c3 = ((HoodieInstant)oriInstants.get(2)).requestedTime();
        String c4 = ((HoodieInstant)instants.get(1)).requestedTime();
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(2);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), (String)"No instants should have range limit");
            Assertions.assertTrue((boolean)sourceContext.splits.stream().anyMatch(split -> split.getLatestCommit().equals(c4)), (String)"At least one input split's latest commit time should be equal to the specified instant time.");
            latch = new CountDownLatch(1);
            sourceContext.reset(latch);
            TestUtils.saveInstantAsComplete(metaClient, (HoodieInstant)oriInstants.get(1), (HoodieCommitMetadata)metadataList.get(0));
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All instants should have range limit");
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> TestStreamReadMonitoringFunction.isPointInstantRange((InstantRange)split.getInstantRange().get(), c2)), (String)"All the splits should have point instant range");
            Assertions.assertTrue((boolean)sourceContext.splits.stream().anyMatch(split -> split.getLatestCommit().equals(c2)), (String)"At least one input split's latest commit time should be equal to the specified instant time.");
            latch = new CountDownLatch(1);
            sourceContext.reset(latch);
            TestUtils.saveInstantAsComplete(metaClient, (HoodieInstant)oriInstants.get(2), (HoodieCommitMetadata)metadataList.get(1));
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All instants should have range limit");
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> TestStreamReadMonitoringFunction.isPointInstantRange((InstantRange)split.getInstantRange().get(), c3)), (String)"All the splits should have point instant range");
            Assertions.assertTrue((boolean)sourceContext.splits.stream().anyMatch(split -> split.getLatestCommit().equals(c3)), (String)"At least one input split's latest commit time should be equal to the specified instant time.");
            function.close();
        }
    }

    @Test
    public void testCheckpointRestore() throws Exception {
        OperatorSubtaskState state;
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            Thread.sleep(1000L);
            state = harness.snapshot(1L, 1L);
            function.close();
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All instants should have range limit");
        }
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function2);){
            harness.setup();
            harness.initializeState(state);
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function2);
            function.close();
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All the instants should have range limit");
        }
    }

    @Test
    public void testStopWithSavepointAndRestore() throws Exception {
        OperatorSubtaskState state;
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function);
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            Thread.sleep(1000L);
            function.cancel();
            state = harness.snapshot(1L, 1L);
            function.close();
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), (String)"All instants should have range limit");
        }
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(this.conf);
        try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = this.createHarness(function2);){
            harness.setup();
            harness.initializeState(state);
            harness.open();
            CountDownLatch latch = new CountDownLatch(4);
            CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
            this.runAsync(sourceContext, function2);
            function.close();
            Assertions.assertTrue((boolean)latch.await(5000L, TimeUnit.MILLISECONDS), (String)"Should finish splits generation");
            MatcherAssert.assertThat((String)"Should produce the expected splits", (Object)sourceContext.getPartitionPaths(), (Matcher)CoreMatchers.is((Object)"par1,par2,par3,par4"));
            Assertions.assertTrue((boolean)sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), (String)"All the instants should have range limit");
        }
    }

    private static boolean isPointInstantRange(InstantRange instantRange, String timestamp) {
        return instantRange != null && Objects.equals(timestamp, instantRange.getStartInstant().get()) && Objects.equals(timestamp, instantRange.getEndInstant().get());
    }

    private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness(StreamReadMonitoringFunction function) throws Exception {
        StreamSource streamSource = new StreamSource((SourceFunction)function);
        return new AbstractStreamOperatorTestHarness((StreamOperator)streamSource, 1, 1, 0);
    }

    private void runAsync(CollectingSourceContext sourceContext, StreamReadMonitoringFunction function) {
        Thread task = new Thread(() -> {
            try {
                function.run((SourceFunction.SourceContext)sourceContext);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        task.start();
    }

    private static class CollectingSourceContext
    implements SourceFunction.SourceContext<MergeOnReadInputSplit> {
        private final List<MergeOnReadInputSplit> splits = new ArrayList<MergeOnReadInputSplit>();
        private final Object checkpointLock = new Object();
        private volatile CountDownLatch latch;

        CollectingSourceContext(CountDownLatch latch) {
            this.latch = latch;
        }

        public void collect(MergeOnReadInputSplit element) {
            this.splits.add(element);
            this.latch.countDown();
        }

        public void collectWithTimestamp(MergeOnReadInputSplit element, long timestamp) {
            this.collect(element);
        }

        public void emitWatermark(Watermark mark) {
        }

        public void markAsTemporarilyIdle() {
        }

        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        public void close() {
        }

        public void reset(CountDownLatch latch) {
            this.latch = latch;
            this.splits.clear();
        }

        public String getPartitionPaths() {
            return this.splits.stream().map(TestUtils::getSplitPartitionPath).distinct().sorted(Comparator.naturalOrder()).collect(Collectors.joining(","));
        }
    }
}

