package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/compact/CompactionCommitSink.class */
public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionCommitSink.class);
    private final Configuration conf;
    private transient Map<String, List<CompactionCommitEvent>> commitBuffer;

    public CompactionCommitSink(Configuration configuration) {
        super(configuration);
        this.conf = configuration;
    }

    @Override // org.apache.hudi.sink.CleanFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        if (this.writeClient == null) {
            this.writeClient = StreamerUtil.createWriteClient(this.conf, getRuntimeContext());
        }
        this.commitBuffer = new HashMap();
    }

    public void invoke(CompactionCommitEvent compactionCommitEvent, SinkFunction.Context context) throws Exception {
        String instant = compactionCommitEvent.getInstant();
        this.commitBuffer.computeIfAbsent(instant, str -> {
            return new ArrayList();
        }).add(compactionCommitEvent);
        commitIfNecessary(instant, this.commitBuffer.get(instant));
    }

    private void commitIfNecessary(String str, List<CompactionCommitEvent> list) throws IOException {
        if (CompactionUtils.getCompactionPlan(this.writeClient.getHoodieTable().getMetaClient(), str).getOperations().size() == list.size()) {
            List list2 = (List) list.stream().map((v0) -> {
                return v0.getWriteStatuses();
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
            if (this.writeClient.getConfig().shouldAutoCommit().booleanValue()) {
                List<HoodieWriteStat> list3 = (List) list2.stream().map((v0) -> {
                    return v0.getStat();
                }).collect(Collectors.toList());
                HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata(true);
                for (HoodieWriteStat hoodieWriteStat : list3) {
                    hoodieCommitMetadata.addWriteStat(hoodieWriteStat.getPartitionPath(), hoodieWriteStat);
                }
                hoodieCommitMetadata.addMetadata("schema", this.writeClient.getConfig().getSchema());
                this.writeClient.completeCompaction(hoodieCommitMetadata, list2, this.writeClient.getHoodieTable(), str);
            }
            this.writeClient.commitCompaction(str, list2, Option.empty());
            if (!this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
                this.writeClient.clean();
            }
            reset(str);
        }
    }

    private void reset(String str) {
        this.commitBuffer.remove(str);
    }
}
