package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;

/* loaded from: input_file:org/apache/hudi/sink/compact/CompactionPlanOperator.class */
public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPlanEvent> implements OneInputStreamOperator<Object, CompactionPlanEvent> {
    private final Configuration conf;
    private transient HoodieFlinkWriteClient writeClient;
    private String compactionInstantTime;

    public CompactionPlanOperator(Configuration configuration) {
        this.conf = configuration;
    }

    public void open() throws Exception {
        super.open();
        this.writeClient = StreamerUtil.createWriteClient(this.conf, getRuntimeContext());
    }

    public void processElement(StreamRecord<Object> streamRecord) {
    }

    public void notifyCheckpointComplete(long j) {
        try {
            HoodieFlinkTable<?> hoodieTable = this.writeClient.getHoodieTable();
            CompactionUtil.rollbackCompaction(hoodieTable, this.writeClient, this.conf);
            scheduleCompaction(hoodieTable, j);
        } catch (Throwable th) {
            LOG.error("Error while scheduling compaction at instant: " + this.compactionInstantTime, th);
        }
    }

    private void scheduleCompaction(HoodieFlinkTable<?> hoodieFlinkTable, long j) throws IOException {
        Option<HoodieInstant> lastInstant = hoodieFlinkTable.getActiveTimeline().filterPendingCompactionTimeline().filter(hoodieInstant -> {
            return hoodieInstant.getState() == HoodieInstant.State.REQUESTED;
        }).lastInstant();
        if (!lastInstant.isPresent()) {
            LOG.info("No compaction plan for checkpoint " + j);
            return;
        }
        String timestamp = lastInstant.get().getTimestamp();
        if (this.compactionInstantTime != null && Objects.equals(this.compactionInstantTime, timestamp)) {
            LOG.info("Duplicate scheduling for compaction instant: " + timestamp + ", ignore");
            return;
        }
        HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(hoodieFlinkTable.getMetaClient(), timestamp);
        if (compactionPlan == null || compactionPlan.getOperations() == null || compactionPlan.getOperations().isEmpty()) {
            LOG.info("No compaction plan for checkpoint " + j + " and instant " + timestamp);
            return;
        }
        this.compactionInstantTime = timestamp;
        hoodieFlinkTable.getActiveTimeline().transitionCompactionRequestedToInflight(HoodieTimeline.getCompactionRequestedInstant(timestamp));
        hoodieFlinkTable.getMetaClient().reloadActiveTimeline();
        List list = (List) compactionPlan.getOperations().stream().map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
        LOG.info("CompactionPlanOperator compacting " + list + " files");
        Iterator it = list.iterator();
        while (it.hasNext()) {
            this.output.collect(new StreamRecord(new CompactionPlanEvent(timestamp, (CompactionOperation) it.next())));
        }
    }

    @VisibleForTesting
    public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) {
        this.output = output;
    }
}
