package org.apache.iceberg.flink.maintenance.operator;

import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.class */
public class DeleteFilesProcessor extends AbstractStreamOperator<Void> implements OneInputStreamOperator<String, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(DeleteFilesProcessor.class);
    private final String taskIndex;
    private final String taskName;
    private final SupportsBulkOperations io;
    private final String tableName;
    private final Set<String> filesToDelete = Sets.newHashSet();
    private final int batchSize;
    private transient Counter failedCounter;
    private transient Counter succeededCounter;

    public DeleteFilesProcessor(int i, String str, Table table, int i2) {
        Preconditions.checkNotNull(str, "Task name should no be null");
        Preconditions.checkNotNull(table, "Table should no be null");
        FileIO io = table.io();
        Preconditions.checkArgument(io instanceof SupportsBulkOperations, "%s doesn't support bulk delete", io.getClass().getSimpleName());
        this.taskIndex = String.valueOf(i);
        this.taskName = str;
        this.io = (SupportsBulkOperations) io;
        this.tableName = table.name();
        this.batchSize = i2;
    }

    public void open() throws Exception {
        this.failedCounter = getRuntimeContext().getMetricGroup().addGroup(TableMaintenanceMetrics.GROUP_KEY).addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, this.tableName).addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, this.taskName).addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, this.taskIndex).counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER);
        this.succeededCounter = getRuntimeContext().getMetricGroup().addGroup(TableMaintenanceMetrics.GROUP_KEY).addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, this.tableName).addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, this.taskName).addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, this.taskIndex).counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER);
    }

    public void processElement(StreamRecord<String> streamRecord) throws Exception {
        if (streamRecord.isRecord()) {
            this.filesToDelete.add((String) streamRecord.getValue());
        }
        if (this.filesToDelete.size() >= this.batchSize) {
            deleteFiles();
        }
    }

    public void processWatermark(Watermark watermark) {
        deleteFiles();
    }

    public void prepareSnapshotPreBarrier(long j) {
        deleteFiles();
    }

    private void deleteFiles() {
        try {
            this.io.deleteFiles(this.filesToDelete);
            LOG.info("Deleted {} files from table {} using bulk deletes", Integer.valueOf(this.filesToDelete.size()), this.tableName);
            this.succeededCounter.inc(this.filesToDelete.size());
            this.filesToDelete.clear();
        } catch (BulkDeletionFailureException e) {
            int size = this.filesToDelete.size() - e.numberFailedObjects();
            LOG.warn("Deleted only {} of {} files from table {} using bulk deletes", new Object[]{Integer.valueOf(size), Integer.valueOf(this.filesToDelete.size()), this.tableName, e});
            this.succeededCounter.inc(size);
            this.failedCounter.inc(e.numberFailedObjects());
        }
    }
}
