/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.utils;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.InsertFunctionWrapper;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.sink.utils.TestFunctionWrapper;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;

public class TestWriteBase {
    protected static final Map<String, String> EXPECTED1 = new HashMap<String, String>();
    protected static final Map<String, String> EXPECTED2 = new HashMap<String, String>();
    protected static final Map<String, String> EXPECTED3 = new HashMap<String, String>();
    protected static final Map<String, String> EXPECTED4 = new HashMap<String, String>();
    protected static final Map<String, List<String>> EXPECTED5 = new HashMap<String, List<String>>();

    static {
        EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
        EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
        EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
        EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
        EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
        EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, id9,par3,id9,Jane,19,6,par3]");
        EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
        EXPECTED4.put("par1", "[id1,par1,id1,Danny,23,0,par1, id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,2,par1, id1,par1,id1,Danny,23,3,par1, id1,par1,id1,Danny,23,4,par1]");
        EXPECTED5.put("par1", Arrays.asList("id1,par1,id1,Danny,23,0,par1", "id1,par1,id1,Danny,23,0,par1", "id1,par1,id1,Danny,23,1,par1", "id1,par1,id1,Danny,23,1,par1", "id1,par1,id1,Danny,23,2,par1", "id1,par1,id1,Danny,23,2,par1", "id1,par1,id1,Danny,23,3,par1", "id1,par1,id1,Danny,23,3,par1", "id1,par1,id1,Danny,23,4,par1", "id1,par1,id1,Danny,23,4,par1"));
    }

    public static class TestHarness {
        private File baseFile;
        private String basePath;
        private Configuration conf;
        private TestFunctionWrapper<RowData> pipeline;
        private String lastPending;
        private String lastComplete;

        public static TestHarness instance() {
            return new TestHarness();
        }

        public TestHarness preparePipeline(File basePath, Configuration conf) throws Exception {
            this.preparePipeline(basePath, conf, false);
            return this;
        }

        public TestHarness preparePipeline(File basePath, Configuration conf, boolean append) throws Exception {
            this.baseFile = basePath;
            this.basePath = this.baseFile.getAbsolutePath();
            this.conf = conf;
            this.pipeline = append ? new InsertFunctionWrapper(this.basePath, conf) : new StreamWriteFunctionWrapper(this.basePath, conf);
            this.pipeline.openFunction();
            return this;
        }

        public TestHarness consume(List<RowData> inputs) throws Exception {
            for (RowData rowData : inputs) {
                this.pipeline.invoke(rowData);
            }
            return this;
        }

        public TestHarness assertConsumeThrows(List<RowData> inputs, String message) {
            Assertions.assertThrows(HoodieException.class, () -> this.consume(inputs), (String)message);
            return this;
        }

        public TestHarness emptyEventBuffer() {
            Assertions.assertTrue((this.pipeline.getEventBuffer().length == 1 && this.pipeline.getEventBuffer()[0] == null ? 1 : 0) != 0, (String)"The coordinator events buffer expect to be empty");
            return this;
        }

        public TestHarness assertNextEvent() {
            OperatorEvent nextEvent = this.pipeline.getNextEvent();
            MatcherAssert.assertThat((String)"The operator expect to send an event", (Object)nextEvent, (Matcher)CoreMatchers.instanceOf(WriteMetadataEvent.class));
            this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
            Assertions.assertNotNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator missed the event");
            return this;
        }

        public TestHarness assertNextEvent(int numWriteStatus, String partitions) {
            OperatorEvent nextEvent = this.pipeline.getNextEvent();
            MatcherAssert.assertThat((String)"The operator expect to send an event", (Object)nextEvent, (Matcher)CoreMatchers.instanceOf(WriteMetadataEvent.class));
            List writeStatuses = ((WriteMetadataEvent)nextEvent).getWriteStatuses();
            Assertions.assertNotNull((Object)writeStatuses);
            MatcherAssert.assertThat((Object)writeStatuses.size(), (Matcher)CoreMatchers.is((Object)numWriteStatus));
            MatcherAssert.assertThat((Object)writeStatuses.stream().map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder()).collect(Collectors.joining(",")), (Matcher)CoreMatchers.is((Object)partitions));
            this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
            Assertions.assertNotNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator missed the event");
            return this;
        }

        public TestHarness assertEmptyEvent() {
            OperatorEvent nextEvent = this.pipeline.getNextEvent();
            MatcherAssert.assertThat((String)"The operator expect to send an event", (Object)nextEvent, (Matcher)CoreMatchers.instanceOf(WriteMetadataEvent.class));
            List writeStatuses = ((WriteMetadataEvent)nextEvent).getWriteStatuses();
            Assertions.assertNotNull((Object)writeStatuses);
            MatcherAssert.assertThat((Object)writeStatuses.size(), (Matcher)CoreMatchers.is((Object)0));
            this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
            Assertions.assertNotNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator missed the event");
            return this;
        }

        public TestHarness assertDataBuffer(int numBuckets, int numRecords) {
            Map<String, List<HoodieRecord>> dataBuffer = this.pipeline.getDataBuffer();
            MatcherAssert.assertThat((String)("Should have " + numBuckets + " data bucket"), (Object)dataBuffer.size(), (Matcher)CoreMatchers.is((Object)numBuckets));
            MatcherAssert.assertThat((String)(numRecords + " records expect to flush out as a mini-batch"), (Object)dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), (Matcher)CoreMatchers.is((Object)numRecords));
            return this;
        }

        public TestHarness checkpoint(long checkpointId) throws Exception {
            this.pipeline.checkpointFunction(checkpointId);
            return this;
        }

        public TestHarness allDataFlushed() {
            Map<String, List<HoodieRecord>> dataBuffer = this.pipeline.getDataBuffer();
            MatcherAssert.assertThat((String)"All data should be flushed out", (Object)dataBuffer.size(), (Matcher)CoreMatchers.is((Object)0));
            return this;
        }

        public TestHarness handleEvents(int numEvents) {
            for (int i = 0; i < numEvents; ++i) {
                OperatorEvent event = this.pipeline.getNextEvent();
                MatcherAssert.assertThat((String)"The operator expect to send an event", (Object)event, (Matcher)CoreMatchers.instanceOf(WriteMetadataEvent.class));
                this.pipeline.getCoordinator().handleEventFromOperator(0, event);
            }
            Assertions.assertNotNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator missed the event");
            return this;
        }

        public TestHarness checkpointComplete(long checkpointId) {
            this.lastPending = this.lastPendingInstant();
            this.pipeline.checkpointComplete(checkpointId);
            this.checkInflightInstant();
            this.checkInstantState(HoodieInstant.State.COMPLETED, this.lastPending);
            this.lastComplete = this.lastPending;
            this.lastPending = this.lastPendingInstant();
            return this;
        }

        public TestHarness emptyCheckpoint(long checkpointId) {
            String lastPending = this.lastPendingInstant();
            this.pipeline.checkpointComplete(checkpointId);
            Assertions.assertEquals((Object)this.lastPending, (Object)lastPending);
            this.checkInstantState(HoodieInstant.State.COMPLETED, this.lastComplete);
            return this;
        }

        public TestHarness checkpointFails(long checkpointId) {
            this.pipeline.checkpointFails(checkpointId);
            Assertions.assertFalse((boolean)this.pipeline.getCoordinatorContext().isJobFailed(), (String)"The last checkpoint was aborted, ignore the events");
            this.checkInstantState(HoodieInstant.State.COMPLETED, null);
            return this;
        }

        public TestHarness checkpointThrows(long checkpointId, String message) {
            Assertions.assertThrows(HoodieException.class, () -> this.checkpoint(checkpointId), (String)message);
            return this;
        }

        public TestHarness subTaskFails(int taskId) throws Exception {
            String instant1 = this.lastPendingInstant();
            this.pipeline.subTaskFails(taskId);
            String instant2 = this.lastPendingInstant();
            Assertions.assertNotEquals((Object)instant2, (Object)instant1, (String)"The previous instant should be rolled back when starting new instant");
            return this;
        }

        public TestHarness noCompleteInstant() {
            this.checkInstantState(HoodieInstant.State.COMPLETED, null);
            return this;
        }

        public TestHarness assertEmptyDataFiles() {
            File[] dataFiles = this.baseFile.listFiles(file -> !file.getName().startsWith("."));
            Assertions.assertNotNull((Object)dataFiles);
            MatcherAssert.assertThat((Object)dataFiles.length, (Matcher)CoreMatchers.is((Object)0));
            return this;
        }

        public TestHarness checkWrittenData(Map<String, String> expected) throws Exception {
            this.checkWrittenData(expected, 4);
            return this;
        }

        public TestHarness checkWrittenData(Map<String, String> expected, int partitions) throws Exception {
            if (OptionsResolver.isCowTable((Configuration)this.conf)) {
                TestData.checkWrittenData(this.baseFile, expected, partitions);
            } else {
                this.checkWrittenDataMor(this.baseFile, expected, partitions);
            }
            return this;
        }

        private void checkWrittenDataMor(File baseFile, Map<String, String> expected, int partitions) throws Exception {
            FileSystem fs = FSUtils.getFs((String)this.basePath, (org.apache.hadoop.conf.Configuration)new org.apache.hadoop.conf.Configuration());
            TestData.checkWrittenDataMOR(fs, baseFile, expected, partitions);
        }

        public TestHarness checkWrittenDataCOW(Map<String, List<String>> expected) throws IOException {
            TestData.checkWrittenDataCOW(this.baseFile, expected);
            return this;
        }

        public TestHarness checkWrittenAllData(Map<String, String> expected, int partitions) throws IOException {
            TestData.checkWrittenAllData(this.baseFile, expected, partitions);
            return this;
        }

        public TestHarness checkIndexLoaded(HoodieKey ... keys) {
            for (HoodieKey key : keys) {
                Assertions.assertTrue((boolean)this.pipeline.isKeyInState(key), (String)("Key: " + key + " assumes to be in the index state"));
            }
            return this;
        }

        public TestHarness assertBootstrapped() throws Exception {
            Assertions.assertTrue((boolean)this.pipeline.isAlreadyBootstrap());
            return this;
        }

        public TestHarness assertConfirming() {
            Assertions.assertTrue((boolean)this.pipeline.isConforming(), (String)"The write function should be waiting for the instant to commit");
            return this;
        }

        public TestHarness assertNotConfirming() {
            Assertions.assertFalse((boolean)this.pipeline.isConforming(), (String)"The write function should finish waiting for the instant to commit");
            return this;
        }

        public void end() throws Exception {
            this.pipeline.close();
        }

        private String lastPendingInstant() {
            return TestUtils.getLastPendingInstant(this.basePath);
        }

        private void checkInflightInstant() {
            String instant = TestUtils.getLastPendingInstant(this.basePath);
            Assertions.assertNotNull((Object)instant);
        }

        private void checkInstantState(HoodieInstant.State state, String instantStr) {
            String instant;
            switch (state) {
                case REQUESTED: {
                    instant = this.lastPendingInstant();
                    break;
                }
                case COMPLETED: {
                    instant = this.lastCompleteInstant();
                    break;
                }
                default: {
                    throw new AssertionError((Object)"Unexpected state");
                }
            }
            MatcherAssert.assertThat((Object)instant, (Matcher)CoreMatchers.is((Object)instantStr));
        }

        protected String lastCompleteInstant() {
            return OptionsResolver.isMorTable((Configuration)this.conf) ? TestUtils.getLastDeltaCompleteInstant(this.basePath) : TestUtils.getLastCompleteInstant(this.basePath, "commit");
        }
    }
}

