package org.apache.hudi.sink;

import java.io.File;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/sink/TestWriteCopyOnWrite.class */
public class TestWriteCopyOnWrite {
    protected static final Map<String, String> EXPECTED1 = new HashMap();
    protected static final Map<String, String> EXPECTED2 = new HashMap();
    protected static final Map<String, String> EXPECTED3 = new HashMap();
    protected Configuration conf;
    protected StreamWriteFunctionWrapper<RowData> funcWrapper;

    @TempDir
    File tempFile;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.sink.TestWriteCopyOnWrite$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/sink/TestWriteCopyOnWrite$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$table$timeline$HoodieInstant$State = new int[HoodieInstant.State.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$table$timeline$HoodieInstant$State[HoodieInstant.State.REQUESTED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$table$timeline$HoodieInstant$State[HoodieInstant.State.COMPLETED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @BeforeEach
    public void before() throws Exception {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name());
        setUp(this.conf);
        this.funcWrapper = new StreamWriteFunctionWrapper<>(this.tempFile.getAbsolutePath(), this.conf);
    }

    protected void setUp(Configuration configuration) {
    }

    @AfterEach
    public void after() throws Exception {
        this.funcWrapper.close();
    }

    @Test
    public void testCheckpoint() throws Exception {
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        Assertions.assertTrue(this.funcWrapper.getEventBuffer().length == 1 && this.funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
        this.funcWrapper.checkpointFunction(1L);
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        WriteMetadataEvent nextEvent = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        List writeStatuses = nextEvent.getWriteStatuses();
        Assertions.assertNotNull(writeStatuses);
        MatcherAssert.assertThat(Integer.valueOf(writeStatuses.size()), CoreMatchers.is(4));
        MatcherAssert.assertThat(writeStatuses.stream().map((v0) -> {
            return v0.getPartitionPath();
        }).sorted(Comparator.naturalOrder()).collect(Collectors.joining(",")), CoreMatchers.is("par1,par2,par3,par4"));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, lastPendingInstant);
        this.funcWrapper.checkpointComplete(1L);
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
        this.funcWrapper.checkpointFunction(2L);
        Assertions.assertNotEquals(lastPendingInstant, this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType()));
        WriteMetadataEvent nextEvent2 = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent2, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        List writeStatuses2 = nextEvent2.getWriteStatuses();
        Assertions.assertNotNull(writeStatuses2);
        MatcherAssert.assertThat(Integer.valueOf(writeStatuses2.size()), CoreMatchers.is(0));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        this.funcWrapper.checkpointComplete(2L);
        checkInflightInstant(this.funcWrapper.getWriteClient());
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
    }

    @Test
    public void testCheckpointFails() throws Exception {
        this.funcWrapper.openFunction();
        this.funcWrapper.checkpointFunction(1L);
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        Assertions.assertNotNull(lastPendingInstant);
        WriteMetadataEvent nextEvent = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        List writeStatuses = nextEvent.getWriteStatuses();
        Assertions.assertNotNull(writeStatuses);
        MatcherAssert.assertThat(Integer.valueOf(writeStatuses.size()), CoreMatchers.is(0));
        this.funcWrapper.checkpointFails(1L);
        Assertions.assertFalse(this.funcWrapper.getCoordinatorContext().isJobFailed(), "The last checkpoint was aborted, ignore the events");
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, lastPendingInstant);
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null);
        Iterator<RowData> it = TestData.DATA_SET_INSERT.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        Assertions.assertThrows(HoodieException.class, () -> {
            this.funcWrapper.checkpointFunction(2L);
        }, "Timeout(0ms) while waiting for");
        this.funcWrapper.checkpointFails(2L);
    }

    @Test
    public void testSubtaskFails() throws Exception {
        this.funcWrapper.openFunction();
        this.funcWrapper.checkpointFunction(1L);
        this.funcWrapper.getNextEvent();
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        Assertions.assertNotNull(lastPendingInstant);
        this.funcWrapper.subTaskFails(0);
        Assertions.assertNotEquals(this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType()), lastPendingInstant, "The previous instant should be rolled back when starting new instant");
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null);
    }

    @Test
    public void testInsert() throws Exception {
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        assertEmptyDataFiles();
        this.funcWrapper.checkpointFunction(1L);
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, lastPendingInstant);
        this.funcWrapper.checkpointComplete(1L);
        checkWrittenData(this.tempFile, EXPECTED1);
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
        checkWrittenData(this.tempFile, EXPECTED1);
    }

    @Test
    public void testInsertDuplicates() throws Exception {
        this.conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
        this.funcWrapper = new StreamWriteFunctionWrapper<>(this.tempFile.getAbsolutePath(), this.conf);
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT_DUPLICATES.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        assertEmptyDataFiles();
        this.funcWrapper.checkpointFunction(1L);
        OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        this.funcWrapper.checkpointComplete(1L);
        checkWrittenData(this.tempFile, EXPECTED3, 1);
        Iterator<RowData> it2 = TestData.DATA_SET_INSERT_DUPLICATES.iterator();
        while (it2.hasNext()) {
            this.funcWrapper.invoke(it2.next());
        }
        this.funcWrapper.checkpointFunction(2L);
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, this.funcWrapper.getNextEvent());
        this.funcWrapper.checkpointComplete(2L);
        checkWrittenData(this.tempFile, EXPECTED3, 1);
    }

    @Test
    public void testUpsert() throws Exception {
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        assertEmptyDataFiles();
        this.funcWrapper.checkpointFunction(1L);
        OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        this.funcWrapper.checkpointComplete(1L);
        Iterator<RowData> it2 = TestData.DATA_SET_UPDATE_INSERT.iterator();
        while (it2.hasNext()) {
            this.funcWrapper.invoke(it2.next());
        }
        checkWrittenData(this.tempFile, EXPECTED1);
        this.funcWrapper.checkpointFunction(2L);
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        OperatorEvent nextEvent2 = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent2, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, lastPendingInstant);
        this.funcWrapper.checkpointComplete(2L);
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
        checkWrittenData(this.tempFile, EXPECTED2);
    }

    @Test
    public void testUpsertWithDelete() throws Exception {
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        assertEmptyDataFiles();
        this.funcWrapper.checkpointFunction(1L);
        OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        this.funcWrapper.checkpointComplete(1L);
        Iterator<RowData> it2 = TestData.DATA_SET_UPDATE_DELETE.iterator();
        while (it2.hasNext()) {
            this.funcWrapper.invoke(it2.next());
        }
        checkWrittenData(this.tempFile, EXPECTED1);
        this.funcWrapper.checkpointFunction(2L);
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        OperatorEvent nextEvent2 = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent2, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, lastPendingInstant);
        this.funcWrapper.checkpointComplete(2L);
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
        checkWrittenData(this.tempFile, getUpsertWithDeleteExpected());
    }

    @Test
    public void testInsertWithMiniBatches() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 8.0E-4d);
        this.funcWrapper = new StreamWriteFunctionWrapper<>(this.tempFile.getAbsolutePath(), this.conf);
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT_DUPLICATES.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        Map<String, List<HoodieRecord>> dataBuffer = this.funcWrapper.getDataBuffer();
        MatcherAssert.assertThat("Should have 1 data bucket", Integer.valueOf(dataBuffer.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat("3 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map((v0) -> {
            return v0.size();
        }).orElse(-1), CoreMatchers.is(3));
        this.funcWrapper.checkpointFunction(1L);
        MatcherAssert.assertThat("All data should be flushed out", Integer.valueOf(this.funcWrapper.getDataBuffer().size()), CoreMatchers.is(0));
        OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
        OperatorEvent nextEvent2 = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent2, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        this.funcWrapper.checkpointComplete(1L);
        Map<String, String> miniBatchExpected = getMiniBatchExpected();
        checkWrittenData(this.tempFile, miniBatchExpected, 1);
        checkInflightInstant(this.funcWrapper.getWriteClient());
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
        Iterator<RowData> it2 = TestData.DATA_SET_INSERT_DUPLICATES.iterator();
        while (it2.hasNext()) {
            this.funcWrapper.invoke(it2.next());
        }
        this.funcWrapper.checkpointFunction(2L);
        OperatorEvent nextEvent3 = this.funcWrapper.getNextEvent();
        OperatorEvent nextEvent4 = this.funcWrapper.getNextEvent();
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent3);
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent4);
        this.funcWrapper.checkpointComplete(2L);
        checkWrittenData(this.tempFile, miniBatchExpected, 1);
    }

    @Test
    public void testInsertWithDeduplication() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 8.0E-4d);
        this.conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
        this.funcWrapper = new StreamWriteFunctionWrapper<>(this.tempFile.getAbsolutePath(), this.conf);
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT_SAME_KEY.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        Map<String, List<HoodieRecord>> dataBuffer = this.funcWrapper.getDataBuffer();
        MatcherAssert.assertThat("Should have 1 data bucket", Integer.valueOf(dataBuffer.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat("3 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map((v0) -> {
            return v0.size();
        }).orElse(-1), CoreMatchers.is(3));
        this.funcWrapper.checkpointFunction(1L);
        MatcherAssert.assertThat("All data should be flushed out", Integer.valueOf(this.funcWrapper.getDataBuffer().size()), CoreMatchers.is(0));
        OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
        OperatorEvent nextEvent2 = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent2, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        this.funcWrapper.checkpointComplete(1L);
        HashMap hashMap = new HashMap();
        hashMap.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
        checkWrittenData(this.tempFile, hashMap, 1);
        checkInflightInstant(this.funcWrapper.getWriteClient());
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
        Iterator<RowData> it2 = TestData.DATA_SET_INSERT_SAME_KEY.iterator();
        while (it2.hasNext()) {
            this.funcWrapper.invoke(it2.next());
        }
        this.funcWrapper.checkpointFunction(2L);
        OperatorEvent nextEvent3 = this.funcWrapper.getNextEvent();
        OperatorEvent nextEvent4 = this.funcWrapper.getNextEvent();
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent3);
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent4);
        this.funcWrapper.checkpointComplete(2L);
        checkWrittenData(this.tempFile, hashMap, 1);
    }

    @Test
    public void testInsertAllowsDuplication() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 6.0E-4d);
        this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
        this.conf.setBoolean(FlinkOptions.INSERT_DEDUP, false);
        this.funcWrapper = new StreamWriteFunctionWrapper<>(this.tempFile.getAbsolutePath(), this.conf);
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT_SAME_KEY.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        this.funcWrapper.checkpointFunction(1L);
        MatcherAssert.assertThat("All data should be flushed out", Integer.valueOf(this.funcWrapper.getDataBuffer().size()), CoreMatchers.is(0));
        OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
        OperatorEvent nextEvent2 = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent2, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        this.funcWrapper.checkpointComplete(1L);
        HashMap hashMap = new HashMap();
        hashMap.put("par1", "[id1,par1,id1,Danny,23,0,par1, id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,2,par1, id1,par1,id1,Danny,23,3,par1, id1,par1,id1,Danny,23,4,par1]");
        TestData.checkWrittenAllData(this.tempFile, hashMap, 1);
        checkInflightInstant(this.funcWrapper.getWriteClient());
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
        Iterator<RowData> it2 = TestData.DATA_SET_INSERT_SAME_KEY.iterator();
        while (it2.hasNext()) {
            this.funcWrapper.invoke(it2.next());
        }
        this.funcWrapper.checkpointFunction(2L);
        OperatorEvent nextEvent3 = this.funcWrapper.getNextEvent();
        OperatorEvent nextEvent4 = this.funcWrapper.getNextEvent();
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent3);
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent4);
        this.funcWrapper.checkpointComplete(2L);
        hashMap.put("par1", "[id1,par1,id1,Danny,23,0,par1, id1,par1,id1,Danny,23,0,par1, id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,2,par1, id1,par1,id1,Danny,23,2,par1, id1,par1,id1,Danny,23,3,par1, id1,par1,id1,Danny,23,3,par1, id1,par1,id1,Danny,23,4,par1, id1,par1,id1,Danny,23,4,par1]");
        TestData.checkWrittenAllData(this.tempFile, hashMap, 1);
    }

    @Test
    public void testInsertWithSmallBufferSize() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008d);
        this.funcWrapper = new StreamWriteFunctionWrapper<>(this.tempFile.getAbsolutePath(), this.conf);
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT_DUPLICATES.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        Map<String, List<HoodieRecord>> dataBuffer = this.funcWrapper.getDataBuffer();
        MatcherAssert.assertThat("Should have 1 data bucket", Integer.valueOf(dataBuffer.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat("3 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map((v0) -> {
            return v0.size();
        }).orElse(-1), CoreMatchers.is(3));
        this.funcWrapper.checkpointFunction(1L);
        MatcherAssert.assertThat("All data should be flushed out", Integer.valueOf(this.funcWrapper.getDataBuffer().size()), CoreMatchers.is(0));
        for (int i = 0; i < 2; i++) {
            OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
            MatcherAssert.assertThat("The operator expect to send an event", nextEvent, CoreMatchers.instanceOf(WriteMetadataEvent.class));
            this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        }
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        this.funcWrapper.checkpointComplete(1L);
        Map<String, String> miniBatchExpected = getMiniBatchExpected();
        checkWrittenData(this.tempFile, miniBatchExpected, 1);
        checkInflightInstant(this.funcWrapper.getWriteClient());
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
        Iterator<RowData> it2 = TestData.DATA_SET_INSERT_DUPLICATES.iterator();
        while (it2.hasNext()) {
            this.funcWrapper.invoke(it2.next());
        }
        this.funcWrapper.checkpointFunction(2L);
        for (int i2 = 0; i2 < 2; i2++) {
            this.funcWrapper.getCoordinator().handleEventFromOperator(0, this.funcWrapper.getNextEvent());
        }
        this.funcWrapper.checkpointComplete(2L);
        checkWrittenData(this.tempFile, miniBatchExpected, 1);
    }

    protected Map<String, String> getMiniBatchExpected() {
        HashMap hashMap = new HashMap();
        hashMap.put("par1", "[id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,1,par1]");
        return hashMap;
    }

    protected Map<String, String> getUpsertWithDeleteExpected() {
        HashMap hashMap = new HashMap();
        hashMap.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
        hashMap.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
        hashMap.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
        hashMap.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        return hashMap;
    }

    protected Map<String, String> getExpectedBeforeCheckpointComplete() {
        return EXPECTED2;
    }

    @Test
    public void testIndexStateBootstrap() throws Exception {
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        assertEmptyDataFiles();
        this.funcWrapper.checkpointFunction(1L);
        OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        this.funcWrapper.checkpointComplete(1L);
        checkWrittenData(this.tempFile, EXPECTED1);
        this.conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
        this.funcWrapper = new StreamWriteFunctionWrapper<>(this.tempFile.getAbsolutePath(), this.conf);
        this.funcWrapper.openFunction();
        Iterator<RowData> it2 = TestData.DATA_SET_UPDATE_INSERT.iterator();
        while (it2.hasNext()) {
            this.funcWrapper.invoke(it2.next());
        }
        Assertions.assertTrue(this.funcWrapper.isAlreadyBootstrap());
        checkIndexLoaded(new HoodieKey("id1", "par1"), new HoodieKey("id2", "par1"), new HoodieKey("id3", "par2"), new HoodieKey("id4", "par2"), new HoodieKey("id5", "par3"), new HoodieKey("id6", "par3"), new HoodieKey("id7", "par4"), new HoodieKey("id8", "par4"), new HoodieKey("id9", "par3"), new HoodieKey("id10", "par4"), new HoodieKey("id11", "par4"));
        this.funcWrapper.checkpointFunction(1L);
        String lastPendingInstant = this.funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
        OperatorEvent nextEvent2 = this.funcWrapper.getNextEvent();
        MatcherAssert.assertThat("The operator expect to send an event", nextEvent2, CoreMatchers.instanceOf(WriteMetadataEvent.class));
        checkWrittenData(this.tempFile, getExpectedBeforeCheckpointComplete());
        this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
        Assertions.assertNotNull(this.funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, lastPendingInstant);
        this.funcWrapper.checkpointComplete(1L);
        checkInstantState(this.funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, lastPendingInstant);
        checkWrittenData(this.tempFile, EXPECTED2);
    }

    @Test
    public void testWriteExactlyOnce() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3L);
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006d);
        this.funcWrapper = new StreamWriteFunctionWrapper<>(this.tempFile.getAbsolutePath(), this.conf);
        this.funcWrapper.openFunction();
        Iterator<RowData> it = TestData.DATA_SET_INSERT.iterator();
        while (it.hasNext()) {
            this.funcWrapper.invoke(it.next());
        }
        Assertions.assertTrue(this.funcWrapper.getEventBuffer().length == 1 && this.funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
        this.funcWrapper.checkpointFunction(1L);
        Assertions.assertTrue(this.funcWrapper.isConforming(), "The write function should be waiting for the instant to commit");
        for (int i = 0; i < 2; i++) {
            OperatorEvent nextEvent = this.funcWrapper.getNextEvent();
            MatcherAssert.assertThat("The operator expect to send an event", nextEvent, CoreMatchers.instanceOf(WriteMetadataEvent.class));
            this.funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        }
        this.funcWrapper.checkpointComplete(1L);
        Iterator<RowData> it2 = TestData.DATA_SET_INSERT.iterator();
        while (it2.hasNext()) {
            this.funcWrapper.invoke(it2.next());
        }
        Assertions.assertFalse(this.funcWrapper.isConforming(), "The write function should finish waiting for the instant to commit");
        this.funcWrapper.checkpointFunction(2L);
        Assertions.assertThrows(HoodieException.class, () -> {
            Iterator<RowData> it3 = TestData.DATA_SET_INSERT.iterator();
            while (it3.hasNext()) {
                this.funcWrapper.invoke(it3.next());
            }
        }, "Timeout(500ms) while waiting for instant");
    }

    @Test
    public void testReuseEmbeddedServer() {
        FileSystemViewStorageConfig viewStorageConfig = StreamerUtil.createWriteClient(this.conf, (RuntimeContext) null).getConfig().getViewStorageConfig();
        Assertions.assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
        HoodieFlinkWriteClient createWriteClient = StreamerUtil.createWriteClient(this.conf, (RuntimeContext) null);
        Assertions.assertSame(createWriteClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
        Assertions.assertEquals(viewStorageConfig.getRemoteViewServerPort(), createWriteClient.getConfig().getViewStorageConfig().getRemoteViewServerPort());
    }

    private void checkInflightInstant(HoodieFlinkWriteClient hoodieFlinkWriteClient) {
        Assertions.assertNotNull(hoodieFlinkWriteClient.getLastPendingInstant(getTableType()));
    }

    private void checkInstantState(HoodieFlinkWriteClient hoodieFlinkWriteClient, HoodieInstant.State state, String str) {
        String lastCompletedInstant;
        switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$table$timeline$HoodieInstant$State[state.ordinal()]) {
            case 1:
                lastCompletedInstant = hoodieFlinkWriteClient.getLastPendingInstant(getTableType());
                break;
            case 2:
                lastCompletedInstant = hoodieFlinkWriteClient.getLastCompletedInstant(getTableType());
                break;
            default:
                throw new AssertionError("Unexpected state");
        }
        MatcherAssert.assertThat(lastCompletedInstant, CoreMatchers.is(str));
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.COPY_ON_WRITE;
    }

    protected void checkWrittenData(File file, Map<String, String> map) throws Exception {
        checkWrittenData(file, map, 4);
    }

    protected void checkWrittenData(File file, Map<String, String> map, int i) throws Exception {
        TestData.checkWrittenData(file, map, i);
    }

    protected void assertEmptyDataFiles() {
        File[] listFiles = this.tempFile.listFiles(file -> {
            return !file.getName().startsWith(".");
        });
        Assertions.assertNotNull(listFiles);
        MatcherAssert.assertThat(Integer.valueOf(listFiles.length), CoreMatchers.is(0));
    }

    private void checkIndexLoaded(HoodieKey... hoodieKeyArr) {
        for (HoodieKey hoodieKey : hoodieKeyArr) {
            Assertions.assertTrue(this.funcWrapper.isKeyInState(hoodieKey), "Key: " + hoodieKey + " assumes to be in the index state");
        }
    }

    static {
        EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
        EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
        EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
        EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
        EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
        EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, id9,par3,id9,Jane,19,6,par3]");
        EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
    }
}
