/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.bucket;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(value={FlinkMiniCluster.class})
public class ITTestBucketStreamWrite {
    private static final Map<String, String> EXPECTED = new HashMap<String, String>();
    @TempDir
    File tempFile;

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testBucketStreamWriteAfterRollbackFirstFileGroupCreation(boolean isCow) throws Exception {
        String tablePath = this.tempFile.getAbsolutePath();
        ITTestBucketStreamWrite.doWrite(tablePath, isCow);
        ITTestBucketStreamWrite.doDeleteCommit(tablePath, isCow);
        ITTestBucketStreamWrite.doWrite(tablePath, isCow);
        ITTestBucketStreamWrite.doWrite(tablePath, isCow);
        if (isCow) {
            TestData.checkWrittenData(this.tempFile, EXPECTED, 4);
        } else {
            HoodieStorage storage = HoodieTestUtils.getStorage((String)this.tempFile.getAbsolutePath());
            TestData.checkWrittenDataMOR(storage, this.tempFile, EXPECTED, 4);
        }
    }

    private static void doDeleteCommit(String tablePath, boolean isCow) throws Exception {
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((String)tablePath);
        HoodieTimeline activeCompletedTimeline = metaClient.getActiveTimeline().filterCompletedInstants();
        Assertions.assertEquals((int)1, (int)activeCompletedTimeline.getInstants().size());
        HoodieInstant instant = (HoodieInstant)activeCompletedTimeline.getInstants().get(0);
        String commitInstant = instant.requestedTime();
        String filename = HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName((HoodieInstant)activeCompletedTimeline.getInstants().get(0));
        HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)metaClient.getCommitMetadataSerDe().deserialize(instant, (byte[])metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
        HoodieStorage storage = metaClient.getStorage();
        StoragePath path = new StoragePath(metaClient.getTimelinePath(), filename);
        storage.deleteDirectory(path);
        commitMetadata.getFileIdAndRelativePaths().forEach((fileId, relativePath) -> {
            String[] partitionFileNameSplit = relativePath.split("/");
            String partition = partitionFileNameSplit[0];
            String fileName = partitionFileNameSplit[1];
            try {
                String markerFileName = FileCreateUtilsLegacy.markerFileName((String)fileName, (IOType)IOType.CREATE);
                FileCreateUtilsLegacy.createMarkerFile((String)tablePath, (String)partition, (String)commitInstant, (String)markerFileName);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static String getWriteToken(String relativeFilePath) {
        Pattern writeTokenPattern = Pattern.compile("_((\\d+)-(\\d+)-(\\d+))_");
        Matcher matcher = writeTokenPattern.matcher(relativeFilePath);
        if (!matcher.find()) {
            throw new RuntimeException("Invalid relative file path: " + relativeFilePath);
        }
        return matcher.group(1);
    }

    private static void doWrite(String path, boolean isCow) throws InterruptedException, ExecutionException {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), path);
        options.put(FlinkOptions.TABLE_TYPE.key(), isCow ? FlinkOptions.TABLE_TYPE_COPY_ON_WRITE : FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
        options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "1");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
    }

    static {
        EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
        EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
        EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
        EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
    }
}

