package org.apache.iceberg.flink.maintenance.operator;

import java.io.IOException;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/maintenance/operator/TriggerManager.class */
public class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class);
    private final String tableName;
    private final TriggerLockFactory lockFactory;
    private final List<String> maintenanceTaskNames;
    private final List<TriggerEvaluator> evaluators;
    private final long minFireDelayMs;
    private final long lockCheckDelayMs;
    private transient Counter rateLimiterTriggeredCounter;
    private transient Counter concurrentRunThrottledCounter;
    private transient Counter nothingToTriggerCounter;
    private transient List<Counter> triggerCounters;
    private transient ValueState<Long> nextEvaluationTimeState;
    private transient ListState<TableChange> accumulatedChangesState;
    private transient ListState<Long> lastTriggerTimesState;
    private transient Long nextEvaluationTime;
    private transient List<TableChange> accumulatedChanges;
    private transient List<Long> lastTriggerTimes;
    private transient TriggerLockFactory.Lock lock;
    private transient TriggerLockFactory.Lock recoveryLock;
    private transient boolean shouldRestoreTasks = false;
    private transient boolean inited = false;
    private transient int startsFrom = 0;
    private transient boolean triggered = false;

    public TriggerManager(TableLoader tableLoader, TriggerLockFactory triggerLockFactory, List<String> list, List<TriggerEvaluator> list2, long j, long j2) {
        Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
        Preconditions.checkNotNull(triggerLockFactory, "Lock factory should no be null");
        Preconditions.checkArgument((list == null || list.isEmpty()) ? false : true, "Invalid maintenance task names: null or empty");
        Preconditions.checkArgument((list2 == null || list2.isEmpty()) ? false : true, "Invalid evaluators: null or empty");
        Preconditions.checkArgument(list.size() == list2.size(), "Provide a name and evaluator for all of the maintenance tasks");
        Preconditions.checkArgument(j > 0, "Minimum fire delay should be at least 1.");
        Preconditions.checkArgument(j2 > 0, "Minimum lock delay rate should be at least 1 ms.");
        tableLoader.open();
        this.tableName = tableLoader.loadTable().name();
        this.lockFactory = triggerLockFactory;
        this.maintenanceTaskNames = list;
        this.evaluators = list2;
        this.minFireDelayMs = j;
        this.lockCheckDelayMs = j2;
    }

    public void open(Configuration configuration) throws Exception {
        this.rateLimiterTriggeredCounter = getRuntimeContext().getMetricGroup().addGroup(TableMaintenanceMetrics.GROUP_KEY).addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, this.tableName).counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
        this.concurrentRunThrottledCounter = getRuntimeContext().getMetricGroup().addGroup(TableMaintenanceMetrics.GROUP_KEY).addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, this.tableName).counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED);
        this.nothingToTriggerCounter = getRuntimeContext().getMetricGroup().addGroup(TableMaintenanceMetrics.GROUP_KEY).addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, this.tableName).counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
        this.triggerCounters = Lists.newArrayListWithExpectedSize(this.maintenanceTaskNames.size());
        for (int i = 0; i < this.maintenanceTaskNames.size(); i++) {
            this.triggerCounters.add(getRuntimeContext().getMetricGroup().addGroup(TableMaintenanceMetrics.GROUP_KEY).addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, this.tableName).addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, this.maintenanceTaskNames.get(i)).addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(i)).counter(TableMaintenanceMetrics.TRIGGERED));
        }
        this.nextEvaluationTimeState = getRuntimeContext().getState(new ValueStateDescriptor("triggerManagerNextTriggerTime", Types.LONG));
        this.accumulatedChangesState = getRuntimeContext().getListState(new ListStateDescriptor("triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class)));
        this.lastTriggerTimesState = getRuntimeContext().getListState(new ListStateDescriptor("triggerManagerLastTriggerTime", Types.LONG));
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.inited) {
            LOG.info("Not initialized, state is not stored");
            return;
        }
        this.nextEvaluationTimeState.update(this.nextEvaluationTime);
        this.accumulatedChangesState.update(this.accumulatedChanges);
        this.lastTriggerTimesState.update(this.lastTriggerTimes);
        LOG.info("Storing state: nextEvaluationTime {}, accumulatedChanges {}, lastTriggerTimes {}", new Object[]{this.nextEvaluationTime, this.accumulatedChanges, this.lastTriggerTimes});
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        LOG.info("Initializing state restored: {}", Boolean.valueOf(functionInitializationContext.isRestored()));
        this.lockFactory.open();
        this.lock = this.lockFactory.createLock();
        this.recoveryLock = this.lockFactory.createRecoveryLock();
        if (functionInitializationContext.isRestored()) {
            this.shouldRestoreTasks = true;
        }
    }

    public void processElement(TableChange tableChange, KeyedProcessFunction<Boolean, TableChange, Trigger>.Context context, Collector<Trigger> collector) throws Exception {
        init(collector, context.timerService());
        this.accumulatedChanges.forEach(tableChange2 -> {
            tableChange2.merge(tableChange);
        });
        long currentProcessingTime = context.timerService().currentProcessingTime();
        if (this.nextEvaluationTime == null) {
            checkAndFire(currentProcessingTime, context.timerService(), collector);
        } else {
            LOG.info("Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", new Object[]{Long.valueOf(currentProcessingTime), this.nextEvaluationTime, this.accumulatedChanges});
            this.rateLimiterTriggeredCounter.inc();
        }
    }

    public void onTimer(long j, KeyedProcessFunction<Boolean, TableChange, Trigger>.OnTimerContext onTimerContext, Collector<Trigger> collector) throws Exception {
        init(collector, onTimerContext.timerService());
        this.nextEvaluationTime = null;
        checkAndFire(onTimerContext.timerService().currentProcessingTime(), onTimerContext.timerService(), collector);
    }

    public void close() throws IOException {
        this.lockFactory.close();
    }

    private void checkAndFire(long j, TimerService timerService, Collector<Trigger> collector) {
        if (this.shouldRestoreTasks) {
            if (this.recoveryLock.isHeld()) {
                LOG.debug("The recovery lock is still held at {}", Long.valueOf(j));
                schedule(timerService, j + this.lockCheckDelayMs);
                return;
            } else {
                LOG.info("The recovery is finished at {}", Long.valueOf(j));
                this.shouldRestoreTasks = false;
            }
        }
        Integer nextTrigger = nextTrigger(this.evaluators, this.accumulatedChanges, this.lastTriggerTimes, j, this.startsFrom);
        if (nextTrigger == null) {
            if (this.triggered) {
                LOG.debug("Execution check finished");
            } else {
                this.nothingToTriggerCounter.inc();
                LOG.debug("Nothing to execute at {} for collected: {}", Long.valueOf(j), this.accumulatedChanges);
            }
            this.startsFrom = 0;
            this.triggered = false;
            return;
        }
        if (this.lock.tryLock()) {
            TableChange tableChange = this.accumulatedChanges.get(nextTrigger.intValue());
            collector.collect(Trigger.create(j, nextTrigger.intValue()));
            LOG.debug("Fired event with time: {}, collected: {} for {}", new Object[]{Long.valueOf(j), tableChange, this.tableName});
            this.triggerCounters.get(nextTrigger.intValue()).inc();
            this.accumulatedChanges.set(nextTrigger.intValue(), TableChange.empty());
            this.lastTriggerTimes.set(nextTrigger.intValue(), Long.valueOf(j));
            schedule(timerService, j + this.minFireDelayMs);
            this.startsFrom = (nextTrigger.intValue() + 1) % this.evaluators.size();
            this.triggered = true;
        } else {
            LOG.info("Failed to acquire lock. Delaying task to {}", Long.valueOf(j + this.lockCheckDelayMs));
            this.startsFrom = nextTrigger.intValue();
            this.concurrentRunThrottledCounter.inc();
            schedule(timerService, j + this.lockCheckDelayMs);
        }
        timerService.registerProcessingTimeTimer(this.nextEvaluationTime.longValue());
    }

    private void schedule(TimerService timerService, long j) {
        this.nextEvaluationTime = Long.valueOf(j);
        timerService.registerProcessingTimeTimer(j);
    }

    private static Integer nextTrigger(List<TriggerEvaluator> list, List<TableChange> list2, List<Long> list3, long j, int i) {
        int i2 = i;
        while (!list.get(i2).check(list2.get(i2), list3.get(i2).longValue(), j)) {
            i2 = (i2 + 1) % list.size();
            if (i2 == i) {
                return null;
            }
        }
        return Integer.valueOf(i2);
    }

    private void init(Collector<Trigger> collector, TimerService timerService) throws Exception {
        if (this.inited) {
            return;
        }
        long currentProcessingTime = timerService.currentProcessingTime();
        this.nextEvaluationTime = (Long) this.nextEvaluationTimeState.value();
        this.accumulatedChanges = Lists.newArrayList((Iterable) this.accumulatedChangesState.get());
        this.lastTriggerTimes = Lists.newArrayList((Iterable) this.lastTriggerTimesState.get());
        if (this.accumulatedChanges.isEmpty()) {
            for (int i = 0; i < this.evaluators.size(); i++) {
                this.accumulatedChanges.add(TableChange.empty());
                this.lastTriggerTimes.add(Long.valueOf(currentProcessingTime));
            }
        }
        if (this.shouldRestoreTasks) {
            this.recoveryLock.tryLock();
            collector.collect(Trigger.recovery(currentProcessingTime));
            if (this.nextEvaluationTime == null) {
                schedule(timerService, currentProcessingTime + this.minFireDelayMs);
            }
        }
        this.inited = true;
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((TableChange) obj, (KeyedProcessFunction<Boolean, TableChange, Trigger>.Context) context, (Collector<Trigger>) collector);
    }
}
