package org.apache.iceberg.flink.maintenance.operator;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/maintenance/operator/LockRemover.class */
public class LockRemover extends AbstractStreamOperator<Void> implements OneInputStreamOperator<TaskResult, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(LockRemover.class);
    private final TriggerLockFactory lockFactory;
    private final List<String> maintenanceTaskNames;
    private transient List<Counter> succeededTaskResultCounters;
    private transient List<Counter> failedTaskResultCounters;
    private transient List<AtomicLong> taskLastRunDurationMs;
    private transient TriggerLockFactory.Lock lock;
    private transient TriggerLockFactory.Lock recoveryLock;
    private transient long lastProcessedTaskStartEpoch = 0;

    public LockRemover(TriggerLockFactory triggerLockFactory, List<String> list) {
        Preconditions.checkNotNull(triggerLockFactory, "Lock factory should no be null");
        Preconditions.checkArgument((list == null || list.isEmpty()) ? false : true, "Invalid maintenance task names: null or empty");
        this.lockFactory = triggerLockFactory;
        this.maintenanceTaskNames = list;
    }

    public void open() throws Exception {
        super.open();
        this.succeededTaskResultCounters = Lists.newArrayListWithExpectedSize(this.maintenanceTaskNames.size());
        this.failedTaskResultCounters = Lists.newArrayListWithExpectedSize(this.maintenanceTaskNames.size());
        this.taskLastRunDurationMs = Lists.newArrayListWithExpectedSize(this.maintenanceTaskNames.size());
        for (String str : this.maintenanceTaskNames) {
            this.succeededTaskResultCounters.add(getRuntimeContext().getMetricGroup().addGroup("maintenanceTask", str).counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER));
            this.failedTaskResultCounters.add(getRuntimeContext().getMetricGroup().addGroup("maintenanceTask", str).counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER));
            AtomicLong atomicLong = new AtomicLong(0L);
            this.taskLastRunDurationMs.add(atomicLong);
            MetricGroup addGroup = getRuntimeContext().getMetricGroup().addGroup("maintenanceTask", str);
            Objects.requireNonNull(atomicLong);
            addGroup.gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, atomicLong::get);
        }
        this.lock = this.lockFactory.createLock();
        this.recoveryLock = this.lockFactory.createRecoveryLock();
    }

    public void processElement(StreamRecord<TaskResult> streamRecord) {
        TaskResult taskResult = (TaskResult) streamRecord.getValue();
        LOG.info("Processing result {} for task {}", taskResult, this.maintenanceTaskNames.get(taskResult.taskIndex()));
        long currentTimeMillis = System.currentTimeMillis() - taskResult.startEpoch();
        this.lock.unlock();
        this.lastProcessedTaskStartEpoch = taskResult.startEpoch();
        this.taskLastRunDurationMs.get(taskResult.taskIndex()).set(currentTimeMillis);
        if (taskResult.success()) {
            this.succeededTaskResultCounters.get(taskResult.taskIndex()).inc();
        } else {
            this.failedTaskResultCounters.get(taskResult.taskIndex()).inc();
        }
    }

    public void processWatermark(Watermark watermark) {
        if (watermark.getTimestamp() > this.lastProcessedTaskStartEpoch) {
            this.lock.unlock();
            this.recoveryLock.unlock();
        }
    }
}
