/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.TestWriteBase;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestWriteCopyOnWrite
extends TestWriteBase {
    protected Configuration conf;
    @TempDir
    File tempFile;

    @BeforeEach
    public void before() {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, this.getTableType().name());
        this.setUp(this.conf);
    }

    protected void setUp(Configuration conf) {
    }

    @Test
    public void testCheckpoint() throws Exception {
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).emptyEventBuffer().checkpoint(1L).assertNextEvent(4, "par1,par2,par3,par4").checkpointComplete(1L).checkpoint(2L).assertEmptyEvent().emptyCheckpoint(2L).end();
    }

    @Test
    public void testCheckpointFails() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
        this.preparePipeline(this.conf).checkpoint(1L).assertEmptyEvent().checkpointFails(1L).consume(TestData.DATA_SET_INSERT).checkpointFails(2L).end();
    }

    @Test
    public void testSubtaskFails() throws Exception {
        this.preparePipeline().checkpoint(1L).assertEmptyEvent().subTaskFails(0).noCompleteInstant().end();
    }

    @Test
    public void testInsert() throws Exception {
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED1).end();
    }

    @Test
    public void testInsertDuplicates() throws Exception {
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED3, 1).consume(TestData.DATA_SET_INSERT_DUPLICATES).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(EXPECTED3, 1).end();
    }

    @Test
    public void testUpsert() throws Exception {
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).consume(TestData.DATA_SET_UPDATE_INSERT).checkWrittenData(EXPECTED1).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(EXPECTED2).end();
    }

    @Test
    public void testUpsertWithDelete() throws Exception {
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).consume(TestData.DATA_SET_UPDATE_DELETE).checkWrittenData(EXPECTED1).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(this.getUpsertWithDeleteExpected()).end();
    }

    @Test
    public void testInsertWithMiniBatches() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 8.0E-4);
        Map<String, String> expected = this.getMiniBatchExpected();
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(expected, 1).consume(TestData.DATA_SET_INSERT_DUPLICATES).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenData(expected, 1).end();
    }

    @Test
    public void testInsertWithDeduplication() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 8.0E-4);
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_SAME_KEY).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(expected, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenData(expected, 1).end();
    }

    @Test
    public void testInsertAppendMode() throws Exception {
        this.prepareInsertPipeline().consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenAllData(EXPECTED4, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenDataCOW(EXPECTED5).end();
    }

    @Test
    public void testInsertClustering() throws Exception {
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        this.conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true);
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008);
        TestWriteBase.TestHarness.instance().preparePipeline(this.tempFile, this.conf).consume(TestData.DATA_SET_INSERT_SAME_KEY).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(EXPECTED4, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenDataCOW(EXPECTED5).end();
    }

    @Test
    public void testInsertAsyncClustering() throws Exception {
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        this.conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        this.conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
        this.prepareInsertPipeline(this.conf).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(1L).handleEvents(1).checkpointComplete(1L).checkWrittenData(EXPECTED4, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).handleEvents(1).checkpointComplete(2L).checkWrittenDataCOW(EXPECTED5).end();
    }

    @Test
    public void testInsertWithSmallBufferSize() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008);
        Map<String, String> expected = this.getMiniBatchExpected();
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(expected, 1).consume(TestData.DATA_SET_INSERT_DUPLICATES).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenData(expected, 1).end();
    }

    protected Map<String, String> getMiniBatchExpected() {
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,1,par1]");
        return expected;
    }

    protected Map<String, String> getUpsertWithDeleteExpected() {
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
        expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
        expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
        expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        return expected;
    }

    protected Map<String, String> getExpectedBeforeCheckpointComplete() {
        return EXPECTED2;
    }

    @Test
    public void testIndexStateBootstrap() throws Exception {
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED1, 4).end();
        this.conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
        this.validateIndexLoaded();
    }

    protected void validateIndexLoaded() throws Exception {
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_UPDATE_INSERT).checkIndexLoaded(new HoodieKey("id1", "par1"), new HoodieKey("id2", "par1"), new HoodieKey("id3", "par2"), new HoodieKey("id4", "par2"), new HoodieKey("id5", "par3"), new HoodieKey("id6", "par3"), new HoodieKey("id7", "par4"), new HoodieKey("id8", "par4"), new HoodieKey("id9", "par3"), new HoodieKey("id10", "par4"), new HoodieKey("id11", "par4")).checkpoint(1L).assertBootstrapped().assertNextEvent().checkWrittenData(this.getExpectedBeforeCheckpointComplete()).checkpointComplete(1L).checkWrittenData(EXPECTED2).end();
    }

    @Test
    public void testWriteExactlyOnce() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006);
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT).emptyEventBuffer().checkpoint(1L).assertConfirming().handleEvents(4).checkpointComplete(1L).consume(TestData.DATA_SET_INSERT).assertNotConfirming().checkpoint(2L).assertConsumeThrows(TestData.DATA_SET_INSERT, "Timeout(1000ms) while waiting for instant initialize").end();
    }

    @Test
    public void testReuseEmbeddedServer() throws IOException {
        this.conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500);
        HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient((Configuration)this.conf);
        FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig();
        Assertions.assertSame((Object)viewStorageConfig.getStorageType(), (Object)FileSystemViewStorageType.REMOTE_FIRST);
        writeClient = StreamerUtil.createWriteClient((Configuration)this.conf);
        Assertions.assertSame((Object)writeClient.getConfig().getViewStorageConfig().getStorageType(), (Object)FileSystemViewStorageType.REMOTE_FIRST);
        Assertions.assertEquals((Integer)viewStorageConfig.getRemoteViewServerPort(), (Integer)writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort());
        Assertions.assertEquals((Integer)viewStorageConfig.getRemoteTimelineClientTimeoutSecs(), (int)500);
    }

    private TestWriteBase.TestHarness preparePipeline() throws Exception {
        return this.preparePipeline(this.conf);
    }

    protected TestWriteBase.TestHarness preparePipeline(Configuration conf) throws Exception {
        return TestWriteBase.TestHarness.instance().preparePipeline(this.tempFile, conf);
    }

    protected TestWriteBase.TestHarness prepareInsertPipeline() throws Exception {
        return this.prepareInsertPipeline(this.conf);
    }

    protected TestWriteBase.TestHarness prepareInsertPipeline(Configuration conf) throws Exception {
        return TestWriteBase.TestHarness.instance().preparePipeline(this.tempFile, conf, true);
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.COPY_ON_WRITE;
    }
}

