package org.apache.hudi.sink;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/CleanFunction.class */
public class CleanFunction<T> extends AbstractRichFunction implements SinkFunction<T>, CheckpointedFunction, CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(CleanFunction.class);
    private final Configuration conf;
    protected HoodieFlinkWriteClient writeClient;
    private NonThrownExecutor executor;
    private volatile boolean isCleaning;

    public CleanFunction(Configuration configuration) {
        this.conf = configuration;
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.writeClient = FlinkWriteClients.createWriteClient(this.conf, getRuntimeContext());
        this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        LOG.info(String.format("exec clean with instant time %s...", createNewInstantTime));
        this.executor.execute(() -> {
            this.writeClient.clean(createNewInstantTime);
        }, "wait for cleaning finish", new Object[0]);
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && this.isCleaning) {
            this.executor.execute(() -> {
                try {
                    this.writeClient.waitForCleaningFinish();
                } finally {
                    this.isCleaning = false;
                }
            }, "wait for cleaning finish", new Object[0]);
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) || this.isCleaning) {
            return;
        }
        try {
            this.writeClient.startAsyncCleaning();
            this.isCleaning = true;
        } catch (Throwable th) {
            LOG.warn("Error while start async cleaning", th);
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        if (this.executor != null) {
            this.executor.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
        }
    }
}
