package org.apache.hudi.sink.clustering;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/clustering/ClusteringCommitSink.class */
public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ClusteringCommitSink.class);
    private final Configuration conf;
    private transient HoodieFlinkTable<?> table;
    private transient Map<String, List<ClusteringCommitEvent>> commitBuffer;

    public ClusteringCommitSink(Configuration configuration) {
        super(configuration);
        this.conf = configuration;
    }

    @Override // org.apache.hudi.sink.CleanFunction, org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        if (this.writeClient == null) {
            this.writeClient = FlinkWriteClients.createWriteClient(this.conf, getRuntimeContext());
        }
        this.commitBuffer = new HashMap();
        this.table = this.writeClient.getHoodieTable();
    }

    public void invoke(ClusteringCommitEvent clusteringCommitEvent, SinkFunction.Context context) throws Exception {
        String instant = clusteringCommitEvent.getInstant();
        this.commitBuffer.computeIfAbsent(instant, str -> {
            return new ArrayList();
        }).add(clusteringCommitEvent);
        commitIfNecessary(instant, this.commitBuffer.get(instant));
    }

    private void commitIfNecessary(String str, List<ClusteringCommitEvent> list) {
        HoodieClusteringPlan right = ClusteringUtils.getClusteringPlan(StreamerUtil.createMetaClient(this.conf), HoodieTimeline.getReplaceCommitInflightInstant(str)).get().getRight();
        if (right.getInputGroups().size() == list.size()) {
            try {
                if (list.stream().anyMatch((v0) -> {
                    return v0.isFailed();
                })) {
                    try {
                        ClusteringUtil.rollbackClustering(this.table, this.writeClient, str);
                        reset(str);
                        return;
                    } finally {
                        reset(str);
                    }
                }
                try {
                    doCommit(str, right, list);
                    reset(str);
                } catch (Throwable th) {
                    LOG.error("Error while committing clustering instant: " + str, th);
                    reset(str);
                }
            } catch (Throwable th2) {
                throw th2;
            }
        }
    }

    private void doCommit(String str, HoodieClusteringPlan hoodieClusteringPlan, List<ClusteringCommitEvent> list) {
        List list2 = (List) list.stream().map((v0) -> {
            return v0.getWriteStatuses();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        HoodieWriteMetadata hoodieWriteMetadata = new HoodieWriteMetadata();
        hoodieWriteMetadata.setWriteStatuses(list2);
        hoodieWriteMetadata.setWriteStats((List) list2.stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()));
        hoodieWriteMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(hoodieClusteringPlan, hoodieWriteMetadata));
        validateWriteResult(hoodieClusteringPlan, str, hoodieWriteMetadata);
        if (!hoodieWriteMetadata.getCommitMetadata().isPresent()) {
            hoodieWriteMetadata.setCommitMetadata(Option.of(CommitUtils.buildMetadata(hoodieWriteMetadata.getWriteStats().get(), hoodieWriteMetadata.getPartitionToReplaceFileIds(), Option.empty(), WriteOperationType.CLUSTER, this.writeClient.getConfig().getSchema(), HoodieTimeline.REPLACE_COMMIT_ACTION)));
        }
        this.table.getMetaClient().reloadActiveTimeline();
        this.writeClient.completeTableService(TableServiceType.CLUSTER, hoodieWriteMetadata.getCommitMetadata().get(), this.table, str);
        if (this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
            return;
        }
        LOG.info("Running inline clean");
        this.writeClient.clean();
    }

    private void reset(String str) {
        this.commitBuffer.remove(str);
    }

    private static void validateWriteResult(HoodieClusteringPlan hoodieClusteringPlan, String str, HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata) {
        if (hoodieWriteMetadata.getWriteStatuses().isEmpty()) {
            throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + str + " #groups: " + hoodieClusteringPlan.getInputGroups().size() + " expected at least " + hoodieClusteringPlan.getInputGroups().stream().mapToInt((v0) -> {
                return v0.getNumOutputFileGroups();
            }).sum() + " write statuses");
        }
    }

    private static Map<String, List<String>> getPartitionToReplacedFileIds(HoodieClusteringPlan hoodieClusteringPlan, HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata) {
        Set set = (Set) hoodieWriteMetadata.getWriteStats().get().stream().map(hoodieWriteStat -> {
            return new HoodieFileGroupId(hoodieWriteStat.getPartitionPath(), hoodieWriteStat.getFileId());
        }).collect(Collectors.toSet());
        return (Map) ClusteringUtils.getFileGroupsFromClusteringPlan(hoodieClusteringPlan).filter(hoodieFileGroupId -> {
            return !set.contains(hoodieFileGroupId);
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getPartitionPath();
        }, Collectors.mapping((v0) -> {
            return v0.getFileId();
        }, Collectors.toList())));
    }
}
