package org.apache.flink.table.runtime.operators.join.temporal;

import java.io.IOException;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.table.data.RowData;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.class */
public abstract class BaseTwoInputStreamOperatorWithStateRetention extends AbstractStreamOperator<RowData> implements TwoInputStreamOperator<RowData, RowData, RowData>, Triggerable<Object, VoidNamespace> {
    private static final long serialVersionUID = -5953921797477294258L;
    private static final String CLEANUP_TIMESTAMP = "cleanup-timestamp";
    private static final String TIMERS_STATE_NAME = "timers";
    private final long minRetentionTime;
    private final long maxRetentionTime;
    protected final boolean stateCleaningEnabled;
    private transient ValueState<Long> latestRegisteredCleanupTimer;
    private transient SimpleTimerService timerService;

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseTwoInputStreamOperatorWithStateRetention(long j, long j2) {
        this.minRetentionTime = j;
        this.maxRetentionTime = j2;
        this.stateCleaningEnabled = j > 1;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        initializeTimerService();
        if (this.stateCleaningEnabled) {
            this.latestRegisteredCleanupTimer = getRuntimeContext().getState(new ValueStateDescriptor(CLEANUP_TIMESTAMP, Types.LONG));
        }
    }

    private void initializeTimerService() {
        this.timerService = new SimpleTimerService(getInternalTimerService(TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE, this));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerProcessingCleanupTimer() throws IOException {
        if (this.stateCleaningEnabled) {
            long currentProcessingTime = this.timerService.currentProcessingTime();
            Optional<Long> ofNullable = Optional.ofNullable(this.latestRegisteredCleanupTimer.value());
            if (!ofNullable.isPresent() || currentProcessingTime + this.minRetentionTime > ofNullable.get().longValue()) {
                updateCleanupTimer(currentProcessingTime, ofNullable);
            }
        }
    }

    private void updateCleanupTimer(long j, Optional<Long> optional) throws IOException {
        optional.ifPresent(l -> {
            this.timerService.deleteProcessingTimeTimer(l.longValue());
        });
        long j2 = j + this.maxRetentionTime;
        this.timerService.registerProcessingTimeTimer(j2);
        this.latestRegisteredCleanupTimer.update(Long.valueOf(j2));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanupLastTimer() throws IOException {
        if (this.stateCleaningEnabled) {
            Optional ofNullable = Optional.ofNullable(this.latestRegisteredCleanupTimer.value());
            if (ofNullable.isPresent()) {
                this.latestRegisteredCleanupTimer.clear();
                this.timerService.deleteProcessingTimeTimer(((Long) ofNullable.get()).longValue());
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.Triggerable
    public final void onProcessingTime(InternalTimer<Object, VoidNamespace> internalTimer) throws Exception {
        if (this.stateCleaningEnabled) {
            long timestamp = internalTimer.getTimestamp();
            Long value = this.latestRegisteredCleanupTimer.value();
            if (value == null || value.longValue() != timestamp) {
                return;
            }
            cleanupState(value.longValue());
            this.latestRegisteredCleanupTimer.clear();
        }
    }

    public abstract void cleanupState(long j);
}
