/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.clustering;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.FlinkWriteClients;

public class ClusteringPlanOperator
extends AbstractStreamOperator<ClusteringPlanEvent>
implements OneInputStreamOperator<Object, ClusteringPlanEvent> {
    private final Configuration conf;
    private transient HoodieFlinkTable table;

    public ClusteringPlanOperator(Configuration conf) {
        this.conf = conf;
    }

    public void open() throws Exception {
        super.open();
        this.table = FlinkTables.createTable(this.conf, (RuntimeContext)this.getRuntimeContext());
        ClusteringUtil.rollbackClustering(this.table, FlinkWriteClients.createWriteClient(this.conf, (RuntimeContext)this.getRuntimeContext()));
    }

    public void processElement(StreamRecord<Object> streamRecord) {
    }

    public void notifyCheckpointComplete(long checkpointId) {
        try {
            this.table.getMetaClient().reloadActiveTimeline();
            this.scheduleClustering(this.table, checkpointId);
        }
        catch (Throwable throwable) {
            LOG.error("Error while scheduling clustering plan for checkpoint: " + checkpointId, throwable);
        }
    }

    private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {
        Option firstRequested = Option.fromJavaOptional(ClusteringUtils.getPendingClusteringInstantTimes((HoodieTableMetaClient)table.getMetaClient()).stream().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst());
        if (!firstRequested.isPresent()) {
            LOG.info("No clustering plan for checkpoint " + checkpointId);
            return;
        }
        String clusteringInstantTime = ((HoodieInstant)firstRequested.get()).getTimestamp();
        HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitRequestedInstant((String)clusteringInstantTime);
        Option clusteringPlanOption = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)clusteringInstant);
        if (!clusteringPlanOption.isPresent()) {
            LOG.info("No clustering plan scheduled");
            return;
        }
        HoodieClusteringPlan clusteringPlan = (HoodieClusteringPlan)((Pair)clusteringPlanOption.get()).getRight();
        if (clusteringPlan == null || clusteringPlan.getInputGroups() == null || clusteringPlan.getInputGroups().isEmpty()) {
            LOG.info("Empty clustering plan for instant " + clusteringInstantTime);
        } else {
            table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant, Option.empty());
            table.getMetaClient().reloadActiveTimeline();
            for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
                LOG.info("Execute clustering plan for instant {} as {} file slices", (Object)clusteringInstantTime, (Object)clusteringGroup.getSlices().size());
                this.output.collect((Object)new StreamRecord((Object)new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create((HoodieClusteringGroup)clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())));
            }
        }
    }

    @VisibleForTesting
    public void setOutput(Output<StreamRecord<ClusteringPlanEvent>> output) {
        this.output = output;
    }
}

