/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.cluster;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={FlinkMiniCluster.class})
public class ITTestHoodieFlinkClustering {
    private static final Map<String, String> EXPECTED = new HashMap<String, String>();
    @TempDir
    File tempFile;

    @Test
    public void testHoodieFlinkClustering() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        cfg.targetPartitions = 4;
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
        long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
        conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);
        HoodieFlinkTable table = writeClient.getHoodieTable();
        boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
        Assertions.assertTrue((boolean)scheduled, (String)"The clustering plan should be scheduled");
        table.getMetaClient().reloadActiveTimeline();
        HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
        Option clusteringPlanOption = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)((HoodieInstant)timeline.lastInstant().get()));
        HoodieClusteringPlan clusteringPlan = (HoodieClusteringPlan)((Pair)clusteringPlanOption.get()).getRight();
        HoodieInstant instant2 = HoodieTimeline.getReplaceCommitRequestedInstant((String)clusteringInstantTime);
        table.getActiveTimeline().transitionReplaceRequestedToInflight(instant2, Option.empty());
        Schema tableAvroSchema = StreamerUtil.getTableAvroSchema((HoodieTableMetaClient)table.getMetaClient(), (boolean)false);
        DataType rowDataType = AvroSchemaConverter.convertToDataType((Schema)tableAvroSchema);
        RowType rowType = (RowType)rowDataType.getLogicalType();
        SingleOutputStreamOperator dataStream = env.addSource((SourceFunction)new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), (OneInputStreamOperator)new ClusteringOperator(conf, rowType)).setParallelism(clusteringPlan.getInputGroups().size());
        ExecNodeUtil.setManagedMemoryWeight((Transformation)dataStream.getTransformation(), (long)((long)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
        dataStream.addSink((SinkFunction)new ClusteringCommitSink(conf)).name("clustering_commit").uid("uid_clustering_commit").setParallelism(1);
        env.execute("flink_hudi_clustering");
        TestData.checkWrittenData(this.tempFile, EXPECTED, 4);
    }

    @Test
    public void testHoodieFlinkClusteringService() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        cfg.minClusteringIntervalSeconds = 3;
        cfg.schedule = true;
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf);
        asyncClusteringService.start(null);
        TimeUnit.SECONDS.sleep(5L);
        asyncClusteringService.shutDown();
        TestData.checkWrittenData(this.tempFile, EXPECTED, 4);
    }

    @Test
    public void testHoodieFlinkClusteringSchedule() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PATH.key(), this.tempFile.getAbsolutePath());
        options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
        options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
        String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        cfg.path = this.tempFile.getAbsolutePath();
        Configuration conf = FlinkClusteringConfig.toFlinkConfig((FlinkClusteringConfig)cfg);
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
        long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
        conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
        conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
        conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2);
        conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
        conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        CompactionUtil.setAvroSchema((Configuration)conf, (HoodieTableMetaClient)metaClient);
        String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient((Configuration)conf);
        boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
        Assertions.assertFalse((boolean)scheduled, (String)"1 delta commit, the clustering plan should not be scheduled");
        tableEnv.executeSql("insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')").await();
        TimeUnit.SECONDS.sleep(3L);
        clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
        scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
        Assertions.assertTrue((boolean)scheduled, (String)"2 delta commits, the clustering plan should be scheduled");
    }

    static {
        EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
        EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
        EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
        EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
    }
}

