/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.utils;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.TestFunctionWrapper;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

public class TestWriteBase {
    protected static final Map<String, String> EXPECTED1 = new HashMap<String, String>();
    protected static final Map<String, String> EXPECTED2 = new HashMap<String, String>();
    protected static final Map<String, String> EXPECTED3 = new HashMap<String, String>();
    protected static final Map<String, String> EXPECTED4 = new HashMap<String, String>();
    protected static final Map<String, List<String>> EXPECTED5 = new HashMap<String, List<String>>();
    protected Configuration conf;
    @TempDir
    protected File tempFile;

    @BeforeEach
    public void before() {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, this.getTableType().name());
        this.setUp(this.conf);
    }

    protected void setUp(Configuration conf) {
    }

    @AfterEach
    public void after() {
        this.conf = null;
    }

    protected TestHarness preparePipeline() throws Exception {
        return this.preparePipeline(this.conf);
    }

    protected TestHarness preparePipeline(Configuration conf) throws Exception {
        return TestHarness.instance().preparePipeline(this.tempFile, conf);
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.COPY_ON_WRITE;
    }

    static {
        EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
        EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
        EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
        EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
        EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
        EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, id9,par3,id9,Jane,19,6,par3]");
        EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
        EXPECTED4.put("par1", "[id1,par1,id1,Danny,23,0,par1, id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,2,par1, id1,par1,id1,Danny,23,3,par1, id1,par1,id1,Danny,23,4,par1]");
        EXPECTED5.put("par1", Arrays.asList("id1,par1,id1,Danny,23,0,par1", "id1,par1,id1,Danny,23,0,par1", "id1,par1,id1,Danny,23,1,par1", "id1,par1,id1,Danny,23,1,par1", "id1,par1,id1,Danny,23,2,par1", "id1,par1,id1,Danny,23,2,par1", "id1,par1,id1,Danny,23,3,par1", "id1,par1,id1,Danny,23,3,par1", "id1,par1,id1,Danny,23,4,par1", "id1,par1,id1,Danny,23,4,par1"));
    }

    public static class TestHarness {
        private File baseFile;
        private String basePath;
        private Configuration conf;
        private TestFunctionWrapper<RowData> pipeline;
        private CkpMetadata ckpMetadata;
        private String lastPending;
        private String lastComplete;

        public static TestHarness instance() {
            return new TestHarness();
        }

        public TestHarness preparePipeline(File basePath, Configuration conf) throws Exception {
            this.baseFile = basePath;
            this.basePath = this.baseFile.getAbsolutePath();
            this.conf = conf;
            this.pipeline = TestData.getWritePipeline(this.basePath, conf);
            this.pipeline.openFunction();
            HoodieWriteConfig writeConfig = this.pipeline.getCoordinator().getWriteClient().getConfig();
            this.ckpMetadata = CkpMetadataFactory.getCkpMetadata((HoodieWriteConfig)writeConfig, (Configuration)conf);
            return this;
        }

        public TestHarness consume(List<RowData> inputs) throws Exception {
            for (RowData rowData : inputs) {
                this.pipeline.invoke(rowData);
            }
            return this;
        }

        public TestHarness assertConsumeThrows(List<RowData> inputs, String message) {
            Assertions.assertThrows(HoodieException.class, () -> this.consume(inputs), (String)message);
            return this;
        }

        public TestHarness emptyEventBuffer() {
            Assertions.assertTrue((this.pipeline.getEventBuffer().length == 1 && this.pipeline.getEventBuffer()[0] == null ? 1 : 0) != 0, (String)"The coordinator events buffer expect to be empty");
            return this;
        }

        public TestHarness assertNextEvent() {
            OperatorEvent nextEvent = this.pipeline.getNextEvent();
            MatcherAssert.assertThat((String)"The operator expect to send an event", (Object)nextEvent, (Matcher)CoreMatchers.instanceOf(WriteMetadataEvent.class));
            this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
            if (!((WriteMetadataEvent)nextEvent).isBootstrap()) {
                Assertions.assertNotNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator missed the event");
            } else {
                Assertions.assertNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator should reset event buffer because of the instant initialization");
            }
            return this;
        }

        public TestHarness assertNextEvent(int numWriteStatus, String partitions) {
            OperatorEvent nextEvent = this.pipeline.getNextEvent();
            MatcherAssert.assertThat((String)"The operator expect to send an event", (Object)nextEvent, (Matcher)CoreMatchers.instanceOf(WriteMetadataEvent.class));
            List writeStatuses = ((WriteMetadataEvent)nextEvent).getWriteStatuses();
            Assertions.assertNotNull((Object)writeStatuses);
            MatcherAssert.assertThat((Object)writeStatuses.size(), (Matcher)CoreMatchers.is((Object)numWriteStatus));
            MatcherAssert.assertThat((Object)writeStatuses.stream().map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder()).collect(Collectors.joining(",")), (Matcher)CoreMatchers.is((Object)partitions));
            this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
            Assertions.assertNotNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator missed the event");
            return this;
        }

        public TestHarness assertEmptyEvent() {
            OperatorEvent nextEvent = this.pipeline.getNextEvent();
            MatcherAssert.assertThat((String)"The operator expect to send an event", (Object)nextEvent, (Matcher)CoreMatchers.instanceOf(WriteMetadataEvent.class));
            List writeStatuses = ((WriteMetadataEvent)nextEvent).getWriteStatuses();
            Assertions.assertNotNull((Object)writeStatuses);
            MatcherAssert.assertThat((Object)writeStatuses.size(), (Matcher)CoreMatchers.is((Object)0));
            this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
            if (!((WriteMetadataEvent)nextEvent).isBootstrap()) {
                Assertions.assertNotNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator missed the event");
            } else {
                Assertions.assertNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator should reset event buffer because of the instant initialization");
            }
            return this;
        }

        public TestHarness assertNoEvent() {
            try {
                this.pipeline.getNextEvent();
                throw new AssertionError((Object)"The write task should not send the event");
            }
            catch (Exception exception) {
                return this;
            }
        }

        public TestHarness assertDataBuffer(int numBuckets, int numRecords) {
            Map<String, List<HoodieRecord>> dataBuffer = this.pipeline.getDataBuffer();
            MatcherAssert.assertThat((String)("Should have " + numBuckets + " data bucket"), (Object)dataBuffer.size(), (Matcher)CoreMatchers.is((Object)numBuckets));
            MatcherAssert.assertThat((String)(numRecords + " records expect to flush out as a mini-batch"), (Object)dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), (Matcher)CoreMatchers.is((Object)numRecords));
            return this;
        }

        public TestHarness checkpoint(long checkpointId) throws Exception {
            this.pipeline.checkpointFunction(checkpointId);
            return this;
        }

        public TestHarness stopTimelineServer() {
            HoodieFlinkWriteClient client = this.pipeline.getCoordinator().getWriteClient();
            client.getTimelineServer().ifPresent(embeddedTimelineService -> embeddedTimelineService.stopForBasePath(client.getConfig().getBasePath()));
            return this;
        }

        public TestHarness allDataFlushed() {
            Map<String, List<HoodieRecord>> dataBuffer = this.pipeline.getDataBuffer();
            MatcherAssert.assertThat((String)"All data should be flushed out", (Object)dataBuffer.size(), (Matcher)CoreMatchers.is((Object)0));
            return this;
        }

        public TestHarness handleEvents(int numEvents) {
            for (int i = 0; i < numEvents; ++i) {
                OperatorEvent event = this.pipeline.getNextEvent();
                MatcherAssert.assertThat((String)"The operator expect to send an event", (Object)event, (Matcher)CoreMatchers.instanceOf(WriteMetadataEvent.class));
                this.pipeline.getCoordinator().handleEventFromOperator(0, event);
            }
            Assertions.assertNotNull((Object)this.pipeline.getEventBuffer()[0], (String)"The coordinator missed the event");
            return this;
        }

        public TestHarness checkpointComplete(long checkpointId) {
            this.lastPending = this.lastPendingInstant();
            this.pipeline.checkpointComplete(checkpointId);
            String newInflight = this.checkInflightInstant();
            this.checkInstantState(HoodieInstant.State.COMPLETED, this.lastPending);
            this.lastComplete = this.lastPending;
            this.lastPending = newInflight;
            return this;
        }

        public void endInputThrows(Class<?> cause, String msg) {
            this.pipeline.endInput();
            OperatorEvent nextEvent = this.pipeline.getNextEvent();
            this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
            Assertions.assertTrue((boolean)this.pipeline.getCoordinatorContext().isJobFailed(), (String)"Job should have been failed");
            Throwable throwable = this.pipeline.getCoordinatorContext().getJobFailureReason().getCause();
            MatcherAssert.assertThat((Object)throwable, (Matcher)CoreMatchers.instanceOf(cause));
            MatcherAssert.assertThat((Object)throwable.getMessage(), (Matcher)CoreMatchers.containsString((String)msg));
        }

        public TestHarness endInput() {
            this.pipeline.endInput();
            OperatorEvent nextEvent = this.pipeline.getNextEvent();
            this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
            return this;
        }

        public TestHarness checkpointCompleteThrows(long checkpointId, Class<?> cause, String msg) {
            this.pipeline.checkpointComplete(checkpointId);
            Assertions.assertTrue((boolean)this.pipeline.getCoordinatorContext().isJobFailed(), (String)"Job should have been failed");
            Throwable throwable = this.pipeline.getCoordinatorContext().getJobFailureReason().getCause();
            MatcherAssert.assertThat((Object)throwable, (Matcher)CoreMatchers.instanceOf(cause));
            MatcherAssert.assertThat((Object)throwable.getMessage(), (Matcher)CoreMatchers.containsString((String)msg));
            return this;
        }

        public TestHarness emptyCheckpoint(long checkpointId) {
            String lastPending = this.lastPendingInstant();
            this.pipeline.checkpointComplete(checkpointId);
            if (!OptionsResolver.allowCommitOnEmptyBatch((Configuration)this.conf)) {
                Assertions.assertEquals((Object)this.lastPending, (Object)lastPending);
                this.checkInstantState(HoodieInstant.State.COMPLETED, this.lastComplete);
            } else {
                String newInflight = this.checkInflightInstant();
                this.checkInstantState(HoodieInstant.State.COMPLETED, lastPending);
                this.lastComplete = lastPending;
                this.lastPending = newInflight;
            }
            return this;
        }

        public TestHarness checkpointFails(long checkpointId) {
            this.pipeline.checkpointFails(checkpointId);
            Assertions.assertFalse((boolean)this.pipeline.getCoordinatorContext().isJobFailed(), (String)"The last checkpoint was aborted, ignore the events");
            this.checkInstantState(HoodieInstant.State.COMPLETED, null);
            return this;
        }

        public TestHarness checkpointThrows(long checkpointId, String message) {
            Assertions.assertThrows(HoodieException.class, () -> this.checkpoint(checkpointId), (String)message);
            return this;
        }

        public TestHarness subTaskFails(int taskId) throws Exception {
            String instant1 = this.lastPendingInstant();
            this.subTaskFails(taskId, 0);
            this.assertEmptyEvent();
            String instant2 = this.lastPendingInstant();
            Assertions.assertNotEquals((Object)instant2, (Object)instant1, (String)"The previous instant should be rolled back when starting new instant");
            return this;
        }

        public TestHarness subTaskFails(int taskId, int attemptNumber) throws Exception {
            this.pipeline.subTaskFails(taskId, attemptNumber);
            return this;
        }

        public TestHarness jobFailover() throws Exception {
            this.pipeline.jobFailover();
            return this;
        }

        public TestHarness noCompleteInstant() {
            this.checkInstantState(HoodieInstant.State.COMPLETED, null);
            return this;
        }

        public TestHarness assertEmptyDataFiles() {
            Assertions.assertFalse((boolean)this.fileExists(), (String)"No data files should have been created");
            return this;
        }

        private boolean fileExists() {
            ArrayList<File> dirsToCheck = new ArrayList<File>();
            dirsToCheck.add(this.baseFile);
            while (!dirsToCheck.isEmpty()) {
                File dir = (File)dirsToCheck.remove(0);
                for (File file : Objects.requireNonNull(dir.listFiles())) {
                    if (file.getName().startsWith(".")) continue;
                    if (file.isDirectory()) {
                        dirsToCheck.add(file);
                        continue;
                    }
                    return true;
                }
            }
            return false;
        }

        public TestHarness checkWrittenData(Map<String, String> expected) throws Exception {
            this.checkWrittenData(expected, 4);
            return this;
        }

        public TestHarness checkWrittenData(Map<String, String> expected, int partitions) throws Exception {
            if (OptionsResolver.isCowTable((Configuration)this.conf)) {
                TestData.checkWrittenData(this.baseFile, expected, partitions);
            } else {
                this.checkWrittenDataMor(this.baseFile, expected, partitions);
            }
            return this;
        }

        private void checkWrittenDataMor(File baseFile, Map<String, String> expected, int partitions) throws Exception {
            HoodieStorage storage = HoodieStorageUtils.getStorage((String)this.basePath, (StorageConfiguration)HoodieTestUtils.getDefaultStorageConf());
            TestData.checkWrittenDataMOR(storage, baseFile, expected, partitions);
        }

        public TestHarness checkWrittenDataCOW(Map<String, List<String>> expected) throws IOException {
            TestData.checkWrittenDataCOW(this.baseFile, expected);
            return this;
        }

        public TestHarness checkWrittenAllData(Map<String, String> expected, int partitions) throws IOException {
            TestData.checkWrittenAllData(this.baseFile, expected, partitions);
            return this;
        }

        public TestHarness checkIndexLoaded(HoodieKey ... keys) {
            for (HoodieKey key : keys) {
                Assertions.assertTrue((boolean)this.pipeline.isKeyInState(key), (String)("Key: " + key + " assumes to be in the index state"));
            }
            return this;
        }

        public TestHarness assertBootstrapped() throws Exception {
            Assertions.assertTrue((boolean)this.pipeline.isAlreadyBootstrap());
            return this;
        }

        public TestHarness assertConfirming() {
            Assertions.assertTrue((boolean)this.pipeline.isConforming(), (String)"The write function should be waiting for the instant to commit");
            return this;
        }

        public TestHarness assertNotConfirming() {
            Assertions.assertFalse((boolean)this.pipeline.isConforming(), (String)"The write function should finish waiting for the instant to commit");
            return this;
        }

        public TestHarness rollbackLastCompleteInstantToInflight() throws Exception {
            HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
            Option lastCompletedInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
            TimelineUtils.deleteInstantFile((HoodieStorage)metaClient.getStorage(), (StoragePath)metaClient.getTimelinePath(), (HoodieInstant)((HoodieInstant)lastCompletedInstant.get()), (InstantFileNameGenerator)metaClient.getInstantFileNameGenerator());
            OutputStream outputStream = metaClient.getStorage().create(new StoragePath(HoodieTableMetaClient.getHeartbeatFolderPath((String)this.basePath) + "/" + this.lastComplete), true);
            outputStream.close();
            this.lastPending = this.lastComplete;
            this.lastComplete = this.lastCompleteInstant();
            return this;
        }

        public TestHarness checkLastPendingInstantCompleted() {
            this.checkInstantState(HoodieInstant.State.COMPLETED, this.lastPending);
            this.lastComplete = this.lastPending;
            this.lastPending = this.lastPendingInstant();
            return this;
        }

        public TestHarness restartCoordinator() throws Exception {
            this.pipeline.restartCoordinator();
            return this;
        }

        public void end() throws Exception {
            this.pipeline.close();
            this.pipeline = null;
        }

        private String lastPendingInstant() {
            return this.ckpMetadata.lastPendingInstant();
        }

        private String checkInflightInstant() {
            String instant = this.ckpMetadata.lastPendingInstant();
            Assertions.assertNotNull((Object)instant);
            return instant;
        }

        private void checkInstantState(HoodieInstant.State state, String instantStr) {
            String instant;
            switch (state) {
                case REQUESTED: {
                    instant = this.lastPendingInstant();
                    break;
                }
                case COMPLETED: {
                    instant = this.lastCompleteInstant();
                    break;
                }
                default: {
                    throw new AssertionError((Object)"Unexpected state");
                }
            }
            MatcherAssert.assertThat((Object)instant, (Matcher)CoreMatchers.is((Object)instantStr));
        }

        protected String lastCompleteInstant() {
            if (OptionsResolver.isMultiWriter((Configuration)this.conf)) {
                return this.ckpMetadata.lastCompleteInstant();
            }
            return OptionsResolver.isMorTable((Configuration)this.conf) ? TestUtils.getLastDeltaCompleteInstant(this.basePath) : TestUtils.getLastCompleteInstant(this.basePath, "commit");
        }

        public TestHarness checkCompletedInstantCount(int count) {
            boolean isMor = OptionsResolver.isMorTable((Configuration)this.conf);
            Assertions.assertEquals((int)count, (int)TestUtils.getCompletedInstantCount(this.basePath, isMor ? "deltacommit" : "commit"));
            return this;
        }
    }
}

