package org.apache.hudi.sink.compact;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.StreamerUtil;

/* loaded from: input_file:org/apache/hudi/sink/compact/CompactFunction.class */
public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEvent, CompactionCommitEvent> {
    private final Configuration conf;
    private transient HoodieFlinkWriteClient writeClient;
    private int taskID;

    public CompactFunction(Configuration configuration) {
        this.conf = configuration;
    }

    public void open(Configuration configuration) throws Exception {
        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
        initWriteClient();
    }

    public void processElement(CompactionPlanEvent compactionPlanEvent, KeyedProcessFunction<Long, CompactionPlanEvent, CompactionCommitEvent>.Context context, Collector<CompactionCommitEvent> collector) throws Exception {
        String compactionInstantTime = compactionPlanEvent.getCompactionInstantTime();
        collector.collect(new CompactionCommitEvent(compactionInstantTime, new HoodieFlinkMergeOnReadTableCompactor().compact(new HoodieFlinkCopyOnWriteTable(this.writeClient.getConfig(), this.writeClient.getEngineContext(), this.writeClient.getHoodieTable().getMetaClient()), this.writeClient.getHoodieTable().getMetaClient(), this.writeClient.getConfig(), compactionPlanEvent.getOperation(), compactionInstantTime), this.taskID));
    }

    private void initWriteClient() {
        this.writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(new SerializableConfiguration(StreamerUtil.getHadoopConf()), new FlinkTaskContextSupplier(getRuntimeContext())), StreamerUtil.getHoodieClientConfig(this.conf));
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((CompactionPlanEvent) obj, (KeyedProcessFunction<Long, CompactionPlanEvent, CompactionCommitEvent>.Context) context, (Collector<CompactionCommitEvent>) collector);
    }
}
