package org.apache.iceberg.flink.maintenance.operator;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.TaskResult;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.class */
public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResult> {
    private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class);
    public static final OutputTag<String> DELETE_STREAM = new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING);
    private final TableLoader tableLoader;
    private final Long maxSnapshotAgeMs;
    private final Integer numSnapshots;
    private final Integer plannerPoolSize;
    private transient ExecutorService plannerPool;
    private transient Table table;

    public ExpireSnapshotsProcessor(TableLoader tableLoader, Long l, Integer num, Integer num2) {
        Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
        this.tableLoader = tableLoader;
        this.maxSnapshotAgeMs = l;
        this.numSnapshots = num;
        this.plannerPoolSize = num2;
    }

    public void open(Configuration configuration) throws Exception {
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        this.plannerPool = this.plannerPoolSize != null ? ThreadPools.newWorkerPool(this.table.name() + "-table--planner", this.plannerPoolSize.intValue()) : ThreadPools.getWorkerPool();
    }

    public void processElement(Trigger trigger, ProcessFunction<Trigger, TaskResult>.Context context, Collector<TaskResult> collector) throws Exception {
        try {
            this.table.refresh();
            ExpireSnapshots expireSnapshots = this.table.expireSnapshots();
            if (this.maxSnapshotAgeMs != null) {
                expireSnapshots = expireSnapshots.expireOlderThan(context.timestamp().longValue() - this.maxSnapshotAgeMs.longValue());
            }
            if (this.numSnapshots != null) {
                expireSnapshots = expireSnapshots.retainLast(this.numSnapshots.intValue());
            }
            AtomicLong atomicLong = new AtomicLong(0L);
            expireSnapshots.planWith(this.plannerPool).deleteWith(str -> {
                context.output(DELETE_STREAM, str);
                atomicLong.incrementAndGet();
            }).cleanExpiredFiles(true).commit();
            LOG.info("Successfully finished expiring snapshots for {} at {}. Scheduled {} files for delete.", new Object[]{this.table, context.timestamp(), Long.valueOf(atomicLong.get())});
            collector.collect(new TaskResult(trigger.taskId().intValue(), trigger.timestamp(), true, Collections.emptyList()));
        } catch (Exception e) {
            LOG.error("Failed to expiring snapshots for {} at {}", new Object[]{this.table, context.timestamp(), e});
            collector.collect(new TaskResult(trigger.taskId().intValue(), trigger.timestamp(), false, Lists.newArrayList(e)));
        }
    }

    public void close() throws Exception {
        super.close();
        this.tableLoader.close();
        if (this.plannerPoolSize != null) {
            this.plannerPool.shutdown();
        }
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((Trigger) obj, (ProcessFunction<Trigger, TaskResult>.Context) context, (Collector<TaskResult>) collector);
    }
}
