package org.apache.hudi.sink.compact;

import com.beust.jcommander.JCommander;
import java.util.ResourceBundle;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/compact/HoodieFlinkCompactor.class */
public class HoodieFlinkCompactor {
    protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);

    public static void main(String[] strArr) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig flinkCompactionConfig = new FlinkCompactionConfig();
        JCommander jCommander = new JCommander(flinkCompactionConfig, (ResourceBundle) null, strArr);
        if (flinkCompactionConfig.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        Configuration flinkConfig = FlinkCompactionConfig.toFlinkConfig(flinkCompactionConfig);
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        HoodieFlinkWriteClient createWriteClient = StreamerUtil.createWriteClient(flinkConfig, null);
        HoodieFlinkTable hoodieTable = createWriteClient.getHoodieTable();
        if (flinkCompactionConfig.schedule.booleanValue()) {
            if (!createWriteClient.scheduleCompactionAtInstant(CompactionUtil.getCompactionInstantTime(createMetaClient), Option.empty())) {
                LOG.info("No compaction plan for this job ");
                return;
            }
            hoodieTable.getMetaClient().reloadActiveTimeline();
        }
        HoodieTimeline filterPendingCompactionTimeline = hoodieTable.getActiveTimeline().filterPendingCompactionTimeline();
        Option lastInstant = CompactionUtil.isLIFO(flinkCompactionConfig.compactionSeq) ? filterPendingCompactionTimeline.lastInstant() : filterPendingCompactionTimeline.firstInstant();
        if (!lastInstant.isPresent()) {
            LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
            return;
        }
        String timestamp = ((HoodieInstant) lastInstant.get()).getTimestamp();
        HoodieInstant compactionInflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
        if (filterPendingCompactionTimeline.containsInstant(compactionInflightInstant)) {
            LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
            createWriteClient.rollbackInflightCompaction(compactionInflightInstant, hoodieTable);
            hoodieTable.getMetaClient().reloadActiveTimeline();
        }
        HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(hoodieTable.getMetaClient(), timestamp);
        if (compactionPlan == null || compactionPlan.getOperations() == null || compactionPlan.getOperations().isEmpty()) {
            LOG.info("No compaction plan for instant " + timestamp);
            return;
        }
        HoodieInstant compactionRequestedInstant = HoodieTimeline.getCompactionRequestedInstant(timestamp);
        if (!hoodieTable.getActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionRequestedInstant)) {
            LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\nClean the compaction plan in auxiliary path and cancels the compaction");
            CompactionUtil.cleanInstant(hoodieTable.getMetaClient(), compactionRequestedInstant);
        } else {
            int size = flinkConfig.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 ? compactionPlan.getOperations().size() : flinkConfig.getInteger(FlinkOptions.COMPACTION_TASKS);
            hoodieTable.getActiveTimeline().transitionCompactionRequestedToInflight(compactionRequestedInstant);
            executionEnvironment.addSource(new CompactionPlanSourceFunction(compactionPlan, timestamp)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator(new CompactFunction(flinkConfig))).setParallelism(size).addSink(new CompactionCommitSink(flinkConfig)).name("clean_commits").uid("uid_clean_commits").setParallelism(1);
            executionEnvironment.execute("flink_hudi_compaction");
        }
    }
}
