/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.TestWriteCopyOnWrite;
import org.apache.hudi.sink.utils.TestWriteBase;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestWriteMergeOnReadWithCompact
extends TestWriteCopyOnWrite {
    @Override
    protected void setUp(Configuration conf) {
        conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
    }

    @Override
    @Test
    public void testPartialFailover() {
    }

    @Override
    @Test
    public void testInsertAppendMode() {
    }

    @Override
    public void testInsertClustering() {
    }

    @Override
    @Test
    public void testInsertAsyncClustering() {
    }

    @Override
    protected Map<String, String> getExpectedBeforeCheckpointComplete() {
        return EXPECTED1;
    }

    @Override
    protected Map<String, String> getMiniBatchExpected() {
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
        return expected;
    }

    @Test
    public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() throws Exception {
        this.conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, PartialUpdateAvroPayload.class.getName());
        this.conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        this.conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
        List<BinaryRowData> dataset1 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), null, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")));
        TestWriteBase.TestHarness pipeline1 = this.preparePipeline(this.conf).consume(dataset1).assertEmptyDataFiles();
        Configuration conf2 = this.conf.clone();
        conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
        List<BinaryRowData> dataset2 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id1"), null, 23, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")));
        TestWriteBase.TestHarness pipeline2 = this.preparePipeline(conf2).consume(dataset2).assertEmptyDataFiles();
        pipeline1.checkpoint(1L).assertNextEvent().checkpointComplete(1L);
        pipeline2.checkpoint(1L).assertNextEvent().checkpointComplete(1L);
        Map<String, String> tmpSnapshotResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
        pipeline2.checkWrittenData(tmpSnapshotResult, 1);
        pipeline1.assertEmptyDataFiles();
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)this.conf, (RuntimeContext)TestUtils.getMockRuntimeContext());){
            Option scheduleInstant = writeClient.scheduleCompaction(Option.empty());
            Assertions.assertNotNull((Object)scheduleInstant.get());
        }
        List<BinaryRowData> dataset3 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 53, TimestampData.fromEpochMillis((long)4L), StringData.fromString((String)"par1")));
        pipeline1.consume(dataset3).checkpoint(2L).assertNextEvent().checkpointComplete(2L);
        Map<String, String> finalSnapshotResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1, id3,par1,id3,Julian,53,4,par1]");
        pipeline1.checkWrittenData(finalSnapshotResult, 1);
        Map<String, String> readOptimizedResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
        TestData.checkWrittenData(this.tempFile, readOptimizedResult, 1);
        pipeline1.end();
        pipeline2.end();
    }

    @Test
    public void testNonBlockingConcurrencyControlWithInflightInstant() throws Exception {
        this.conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        this.conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        List<BinaryRowData> dataset1 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")));
        TestWriteBase.TestHarness pipeline1 = this.preparePipeline(this.conf).consume(dataset1).assertEmptyDataFiles();
        Configuration conf2 = this.conf.clone();
        conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
        List<BinaryRowData> dataset2 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id2"), StringData.fromString((String)"Stephen"), 34, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")));
        TestWriteBase.TestHarness pipeline2 = this.preparePipeline(conf2).consume(dataset2).assertEmptyDataFiles();
        pipeline1.checkpoint(1L).assertNextEvent().checkpointComplete(1L);
        pipeline2.checkpoint(1L).assertNextEvent();
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)this.conf, (RuntimeContext)TestUtils.getMockRuntimeContext());){
            Option scheduleInstant = writeClient.scheduleCompaction(Option.empty());
            Assertions.assertNotNull((Object)scheduleInstant.get());
        }
        List<BinaryRowData> dataset3 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 53, TimestampData.fromEpochMillis((long)4L), StringData.fromString((String)"par1")));
        pipeline1.consume(dataset3).checkpoint(2L).assertNextEvent().checkpointComplete(2L);
        Map<String, String> finalSnapshotResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,1,par1, id3,par1,id3,Julian,53,4,par1]");
        pipeline1.checkWrittenData(finalSnapshotResult, 1);
        Map<String, String> readOptimizedResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,1,par1]");
        TestData.checkWrittenData(this.tempFile, readOptimizedResult, 1);
        pipeline1.end();
        pipeline2.end();
    }

    @Test
    public void testBulkInsertWithNonBlockingConcurrencyControl() throws Exception {
        this.conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, PartialUpdateAvroPayload.class.getName());
        this.conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        List<BinaryRowData> dataset1 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), null, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")));
        TestWriteBase.TestHarness pipeline1 = this.preparePipeline(this.conf).consume(dataset1).assertEmptyDataFiles();
        Configuration conf2 = this.conf.clone();
        conf2.setString(FlinkOptions.OPERATION, "BULK_INSERT");
        conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
        List<BinaryRowData> dataset2 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id1"), null, 23, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")));
        TestWriteBase.TestHarness pipeline2 = this.preparePipeline(conf2).consume(dataset2);
        pipeline1.checkpoint(1L).assertNextEvent().checkpointComplete(1L);
        pipeline2.endInputThrows(HoodieWriteConflictException.class, "Cannot resolve conflicts");
        pipeline1.end();
        pipeline2.end();
    }

    @Test
    public void testBulkInsertInSequenceWithNonBlockingConcurrencyControl() throws Exception {
        this.conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, PartialUpdateAvroPayload.class.getName());
        this.conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        this.conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
        Configuration conf1 = this.conf.clone();
        conf1.setString(FlinkOptions.OPERATION, "BULK_INSERT");
        List<BinaryRowData> dataset1 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), null, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")));
        TestWriteBase.TestHarness pipeline1 = this.preparePipeline(conf1).consume(dataset1);
        Configuration conf2 = this.conf.clone();
        conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
        List<BinaryRowData> dataset2 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id1"), null, 23, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")));
        TestWriteBase.TestHarness pipeline2 = this.preparePipeline(conf2).consume(dataset2);
        pipeline1.endInput();
        pipeline2.checkpoint(1L).assertNextEvent().checkpointComplete(1L);
        Map<String, String> tmpSnapshotResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
        pipeline2.checkWrittenData(tmpSnapshotResult, 1);
        try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)this.conf, (RuntimeContext)TestUtils.getMockRuntimeContext());){
            Option scheduleInstant = writeClient.scheduleCompaction(Option.empty());
            Assertions.assertNotNull((Object)scheduleInstant.get());
        }
        List<BinaryRowData> dataset3 = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 53, TimestampData.fromEpochMillis((long)4L), StringData.fromString((String)"par1")));
        pipeline2.consume(dataset3).checkpoint(2L).assertNextEvent().checkpointComplete(2L);
        Map<String, String> finalSnapshotResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1, id3,par1,id3,Julian,53,4,par1]");
        pipeline2.checkWrittenData(finalSnapshotResult, 1);
        Map<String, String> readOptimizedResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
        TestData.checkWrittenData(this.tempFile, readOptimizedResult, 1);
        pipeline1.end();
        pipeline2.end();
    }

    @Override
    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }
}

