/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.clustering;

import com.beust.jcommander.JCommander;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieFlinkClusteringJob {
    protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
    private final AsyncClusteringService clusteringScheduleService;

    public HoodieFlinkClusteringJob(AsyncClusteringService service) {
        this.clusteringScheduleService = service;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkClusteringConfig cfg = HoodieFlinkClusteringJob.getFlinkClusteringConfig(args);
        Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
        AsyncClusteringService service = new AsyncClusteringService(cfg, conf, env);
        new HoodieFlinkClusteringJob(service).start(cfg.serviceMode);
    }

    public void start(boolean serviceMode) throws Exception {
        if (serviceMode) {
            this.clusteringScheduleService.start(null);
            try {
                this.clusteringScheduleService.waitForShutdown();
            }
            catch (Exception e) {
                throw new HoodieException(e.getMessage(), (Throwable)e);
            }
            finally {
                LOG.info("Shut down hoodie flink clustering");
            }
        }
        LOG.info("Hoodie Flink Clustering running only single round");
        try {
            this.clusteringScheduleService.cluster();
        }
        catch (Exception e) {
            LOG.error("Got error running delta sync once. Shutting down", (Throwable)e);
            throw e;
        }
        finally {
            LOG.info("Shut down hoodie flink clustering");
        }
    }

    public static FlinkClusteringConfig getFlinkClusteringConfig(String[] args) {
        FlinkClusteringConfig cfg = new FlinkClusteringConfig();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        return cfg;
    }

    public static class AsyncClusteringService
    extends HoodieAsyncTableService {
        private static final long serialVersionUID = 1L;
        private final FlinkClusteringConfig cfg;
        private final Configuration conf;
        private final HoodieTableMetaClient metaClient;
        private final HoodieFlinkWriteClient<?> writeClient;
        private final HoodieFlinkTable<?> table;
        private final StreamExecutionEnvironment env;
        private final ExecutorService executor;

        public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception {
            this.cfg = cfg;
            this.conf = conf;
            this.env = env;
            this.executor = Executors.newFixedThreadPool(1);
            this.metaClient = StreamerUtil.createMetaClient(conf);
            conf.setString(FlinkOptions.TABLE_NAME, this.metaClient.getTableConfig().getTableName());
            conf.setString(FlinkOptions.TABLE_TYPE, this.metaClient.getTableConfig().getTableType().name());
            conf.setString(FlinkOptions.RECORD_KEY_FIELD, this.metaClient.getTableConfig().getRecordKeyFieldProp());
            conf.setString(FlinkOptions.PARTITION_PATH_FIELD, this.metaClient.getTableConfig().getPartitionFieldProp());
            CompactionUtil.setAvroSchema(conf, this.metaClient);
            this.writeClient = StreamerUtil.createWriteClient(conf);
            this.writeConfig = this.writeClient.getConfig();
            this.table = this.writeClient.getHoodieTable();
        }

        protected Pair<CompletableFuture, ExecutorService> startService() {
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                boolean error = false;
                try {
                    while (!this.isShutdownRequested()) {
                        try {
                            this.cluster();
                            Thread.sleep(this.cfg.minClusteringIntervalSeconds * 1000);
                        }
                        catch (Exception e) {
                            LOG.error((Object)"Shutting down clustering service due to exception", (Throwable)e);
                            error = true;
                            throw new HoodieException(e.getMessage(), (Throwable)e);
                            return true;
                        }
                    }
                }
                finally {
                    this.shutdownAsyncService(error);
                }
            }, this.executor), (Object)this.executor);
        }

        private void cluster() throws Exception {
            Option clusteringPlanOption;
            List instants;
            this.table.getMetaClient().reloadActiveTimeline();
            if (this.cfg.schedule.booleanValue()) {
                ClusteringUtil.validateClusteringScheduling(this.conf);
                String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
                boolean scheduled = this.writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
                if (!scheduled) {
                    LOG.info((Object)"No clustering plan for this job");
                    return;
                }
                this.table.getMetaClient().reloadActiveTimeline();
            }
            if ((instants = ClusteringUtils.getPendingClusteringInstantTimes((HoodieTableMetaClient)this.table.getMetaClient()).stream().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).collect(Collectors.toList())).isEmpty()) {
                LOG.info((Object)"No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
                return;
            }
            HoodieInstant clusteringInstant = CompactionUtil.isLIFO(this.cfg.clusteringSeq) ? (HoodieInstant)instants.get(instants.size() - 1) : (HoodieInstant)instants.get(0);
            HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant((String)clusteringInstant.getTimestamp());
            if (this.table.getMetaClient().getActiveTimeline().containsInstant(inflightInstant)) {
                LOG.info((Object)("Rollback inflight clustering instant: [" + clusteringInstant + "]"));
                this.table.rollbackInflightClustering(inflightInstant, commitToRollback -> this.writeClient.getPendingRollbackInfo(this.table.getMetaClient(), commitToRollback, false));
                this.table.getMetaClient().reloadActiveTimeline();
            }
            if (!(clusteringPlanOption = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)this.table.getMetaClient(), (HoodieInstant)clusteringInstant)).isPresent()) {
                LOG.info((Object)"No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
                return;
            }
            HoodieClusteringPlan clusteringPlan = (HoodieClusteringPlan)((Pair)clusteringPlanOption.get()).getRight();
            if (clusteringPlan == null || clusteringPlan.getInputGroups() == null || clusteringPlan.getInputGroups().isEmpty()) {
                LOG.info((Object)("No clustering plan for instant " + clusteringInstant.getTimestamp()));
                return;
            }
            HoodieInstant instant2 = HoodieTimeline.getReplaceCommitRequestedInstant((String)clusteringInstant.getTimestamp());
            HoodieTimeline pendingClusteringTimeline = this.table.getActiveTimeline().filterPendingReplaceTimeline();
            if (!pendingClusteringTimeline.containsInstant(instant2)) {
                LOG.warn((Object)"The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\nClean the clustering plan in auxiliary path and cancels the clustering");
                CompactionUtil.cleanInstant(this.table.getMetaClient(), instant2);
                return;
            }
            int clusteringParallelism = this.conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1 ? clusteringPlan.getInputGroups().size() : this.conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
            this.table.getActiveTimeline().transitionReplaceRequestedToInflight(instant2, Option.empty());
            Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(this.table.getMetaClient(), false);
            DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
            RowType rowType = (RowType)rowDataType.getLogicalType();
            long ckpTimeout = this.env.getCheckpointConfig().getCheckpointTimeout();
            this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
            SingleOutputStreamOperator dataStream = this.env.addSource((SourceFunction)new ClusteringPlanSourceFunction(clusteringInstant.getTimestamp(), clusteringPlan)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), (OneInputStreamOperator)new ClusteringOperator(this.conf, rowType)).setParallelism(clusteringParallelism);
            ExecNodeUtil.setManagedMemoryWeight((Transformation)dataStream.getTransformation(), (long)((long)this.conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
            dataStream.addSink((SinkFunction)new ClusteringCommitSink(this.conf)).name("clustering_commit").uid("uid_clustering_commit").setParallelism(1);
            this.env.execute("flink_hudi_clustering_" + clusteringInstant.getTimestamp());
        }

        public void shutdownAsyncService(boolean error) {
            LOG.info((Object)("Gracefully shutting down clustering job. Error ?" + error));
            this.executor.shutdown();
            this.writeClient.close();
        }

        @VisibleForTesting
        public void shutDown() {
            this.shutdownAsyncService(false);
        }
    }
}

