package org.apache.flink.runtime.state.ttl;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import java.util.function.Consumer;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.StateMigrationException;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/state/ttl/TtlStateTestBase.class */
public abstract class TtlStateTestBase {
    protected static final long TTL = 100;
    private static final int INC_CLEANUP_ALL_KEYS = 970;
    protected MockTtlTimeProvider timeProvider;
    protected StateBackendTestContext sbetc;
    protected static final String UNEXPIRED_AVAIL = "Unexpired state should be available";
    protected static final String UPDATED_UNEXPIRED_AVAIL = "Unexpired state should be available after update";
    protected static final String EXPIRED_UNAVAIL = "Expired state should be unavailable";
    private static final String EXPIRED_AVAIL = "Expired state should be available";
    private StateTtlConfig ttlConfig;

    @Parameterized.Parameter
    public TtlStateTestContextBase<?, ?, ?> ctx;

    @Before
    public void setup() {
        this.timeProvider = new MockTtlTimeProvider();
        this.sbetc = createStateBackendTestContext(this.timeProvider);
    }

    protected abstract StateBackendTestContext createStateBackendTestContext(TtlTimeProvider ttlTimeProvider);

    @Parameterized.Parameters(name = "{0}")
    public static List<TtlStateTestContextBase<?, ?, ?>> testContexts() {
        return Arrays.asList(new TtlValueStateTestContext(), new TtlFixedLenElemListStateTestContext(), new TtlNonFixedLenElemListStateTestContext(), new TtlMapStateAllEntriesTestContext(), new TtlMapStatePerElementTestContext(), new TtlMapStatePerNullElementTestContext(), new TtlAggregatingStateTestContext(), new TtlReducingStateTestContext(), new TtlFoldingStateTestContext());
    }

    public boolean fullSnapshot() {
        return true;
    }

    protected <S extends InternalKvState<?, String, ?>, UV> TtlStateTestContextBase<S, UV, ?> ctx() {
        return (TtlStateTestContextBase<S, UV, ?>) this.ctx;
    }

    private <UV> TtlMergingStateTestContext<?, UV, ?> mctx() {
        return (TtlMergingStateTestContext) this.ctx;
    }

    private void initTest() throws Exception {
        initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired);
    }

    private void initTest(StateTtlConfig.UpdateType updateType, StateTtlConfig.StateVisibility stateVisibility) throws Exception {
        initTest(updateType, stateVisibility, TTL);
    }

    private void initTest(StateTtlConfig.UpdateType updateType, StateTtlConfig.StateVisibility stateVisibility, long j) throws Exception {
        initTest(getConfBuilder(j).setUpdateType(updateType).setStateVisibility(stateVisibility).build());
    }

    protected static StateTtlConfig.Builder getConfBuilder(long j) {
        return StateTtlConfig.newBuilder(Time.milliseconds(j));
    }

    protected <S extends State> StateDescriptor<S, Object> initTest(StateTtlConfig stateTtlConfig) throws Exception {
        this.ttlConfig = stateTtlConfig;
        this.sbetc.createAndRestoreKeyedStateBackend(null);
        this.sbetc.setCurrentKey("defaultKey");
        StateDescriptor<S, Object> createState = createState();
        ctx().initTestValues();
        return createState;
    }

    private <S extends State> StateDescriptor<S, Object> createState() throws Exception {
        StateDescriptor<S, Object> createStateDescriptor = ctx().createStateDescriptor();
        createStateDescriptor.enableTimeToLive(this.ttlConfig);
        ctx().ttlState = (S) this.sbetc.createState(createStateDescriptor, "defaultNamespace");
        return createStateDescriptor;
    }

    private void takeAndRestoreSnapshot() throws Exception {
        restoreSnapshot(this.sbetc.takeSnapshot(), 10);
    }

    protected void takeAndRestoreSnapshot(int i) throws Exception {
        restoreSnapshot(this.sbetc.takeSnapshot(), i);
    }

    private void restoreSnapshot(KeyedStateHandle keyedStateHandle, int i) throws Exception {
        this.sbetc.createAndRestoreKeyedStateBackend(i, keyedStateHandle);
        this.sbetc.setCurrentKey("defaultKey");
        createState();
    }

    protected boolean incrementalCleanupSupported() {
        return false;
    }

    @Test
    public void testNonExistentValue() throws Exception {
        initTest();
        Assert.assertEquals("Non-existing state should be empty", ctx().emptyValue, ctx().get());
    }

    @Test
    public void testExactExpirationOnWrite() throws Exception {
        initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 0L;
        ctx().update(ctx().updateEmpty);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 20L;
        Assert.assertEquals(UNEXPIRED_AVAIL, ctx().getUpdateEmpty, ctx().get());
        takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        ctx().update(ctx().updateUnexpired);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        Assert.assertEquals(UPDATED_UNEXPIRED_AVAIL, ctx().getUnexpired, ctx().get());
        takeAndRestoreSnapshot();
        this.timeProvider.time = 170L;
        ctx().update(ctx().updateExpired);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 220L;
        Assert.assertEquals(UPDATED_UNEXPIRED_AVAIL, ctx().getUpdateExpired, ctx().get());
        takeAndRestoreSnapshot();
        this.timeProvider.time = 300L;
        Assert.assertEquals(EXPIRED_UNAVAIL, ctx().emptyValue, ctx().get());
        Assert.assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
    }

    @Test
    public void testRelaxedExpirationOnWrite() throws Exception {
        initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp);
        this.timeProvider.time = 0L;
        ctx().update(ctx().updateEmpty);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        Assert.assertEquals(EXPIRED_AVAIL, ctx().getUpdateEmpty, ctx().get());
        Assert.assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
        Assert.assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
    }

    @Test
    public void testExactExpirationOnRead() throws Exception {
        initTest(StateTtlConfig.UpdateType.OnReadAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired);
        this.timeProvider.time = 0L;
        ctx().update(ctx().updateEmpty);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        Assert.assertEquals(UNEXPIRED_AVAIL, ctx().getUpdateEmpty, ctx().get());
        takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        Assert.assertEquals("Unexpired state should be available after read", ctx().getUpdateEmpty, ctx().get());
        takeAndRestoreSnapshot();
        this.timeProvider.time = 250L;
        Assert.assertEquals(EXPIRED_UNAVAIL, ctx().emptyValue, ctx().get());
        Assert.assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
    }

    @Test
    public void testRelaxedExpirationOnRead() throws Exception {
        initTest(StateTtlConfig.UpdateType.OnReadAndWrite, StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp);
        this.timeProvider.time = 0L;
        ctx().update(ctx().updateEmpty);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        Assert.assertEquals(UNEXPIRED_AVAIL, ctx().getUpdateEmpty, ctx().get());
        takeAndRestoreSnapshot();
        this.timeProvider.time = 170L;
        Assert.assertEquals(EXPIRED_AVAIL, ctx().getUpdateEmpty, ctx().get());
        Assert.assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
    }

    @Test
    public void testExpirationTimestampOverflow() throws Exception {
        initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired, Long.MAX_VALUE);
        this.timeProvider.time = 10L;
        ctx().update(ctx().updateEmpty);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        Assert.assertEquals(UNEXPIRED_AVAIL, ctx().getUpdateEmpty, ctx().get());
    }

    @Test
    public void testMergeNamespaces() throws Exception {
        Assume.assumeThat(this.ctx, CoreMatchers.instanceOf(TtlMergingStateTestContext.class));
        initTest();
        this.timeProvider.time = 0L;
        mctx().applyStateUpdates(mctx().generateExpiredUpdatesToMerge());
        takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        List generateUnexpiredUpdatesToMerge = mctx().generateUnexpiredUpdatesToMerge();
        mctx().applyStateUpdates(generateUnexpiredUpdatesToMerge);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 150L;
        List generateFinalUpdatesToMerge = mctx().generateFinalUpdatesToMerge();
        mctx().applyStateUpdates(generateFinalUpdatesToMerge);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 230L;
        mctx().ttlState.mergeNamespaces("targetNamespace", TtlMergingStateTestContext.NAMESPACES);
        mctx().ttlState.setCurrentNamespace("targetNamespace");
        Assert.assertEquals("Unexpected result of merge operation", mctx().getMergeResult(generateUnexpiredUpdatesToMerge, generateFinalUpdatesToMerge), mctx().get());
    }

    @Test
    public void testMultipleKeys() throws Exception {
        initTest();
        testMultipleStateIds(str -> {
            this.sbetc.setCurrentKey(str);
        }, false);
    }

    @Test
    public void testMultipleKeysWithSnapshotCleanup() throws Exception {
        Assume.assumeTrue("full snapshot strategy", fullSnapshot());
        initTest(getConfBuilder(TTL).cleanupFullSnapshot().build());
        testMultipleStateIds(str -> {
            this.sbetc.setCurrentKey(str);
        }, true);
    }

    @Test
    public void testMultipleNamespaces() throws Exception {
        initTest();
        testMultipleStateIds(str -> {
            ctx().ttlState.setCurrentNamespace(str);
        }, false);
    }

    @Test
    public void testMultipleNamespacesWithSnapshotCleanup() throws Exception {
        Assume.assumeTrue("full snapshot strategy", fullSnapshot());
        initTest(getConfBuilder(TTL).cleanupFullSnapshot().build());
        testMultipleStateIds(str -> {
            ctx().ttlState.setCurrentNamespace(str);
        }, true);
    }

    private void testMultipleStateIds(Consumer<String> consumer, boolean z) throws Exception {
        takeAndRestoreSnapshot();
        this.timeProvider.time = 0L;
        consumer.accept("id2");
        ctx().update(ctx().updateEmpty);
        takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        consumer.accept("id1");
        ctx().update(ctx().updateEmpty);
        consumer.accept("id2");
        ctx().update(ctx().updateUnexpired);
        this.timeProvider.time = 120L;
        takeAndRestoreSnapshot();
        consumer.accept("id1");
        Assert.assertEquals(UNEXPIRED_AVAIL, ctx().getUpdateEmpty, ctx().get());
        consumer.accept("id2");
        Assert.assertEquals(UPDATED_UNEXPIRED_AVAIL, ctx().getUnexpired, ctx().get());
        takeAndRestoreSnapshot();
        this.timeProvider.time = 170L;
        consumer.accept("id2");
        ctx().update(ctx().updateExpired);
        this.timeProvider.time = 230L;
        takeAndRestoreSnapshot();
        this.timeProvider.time = z ? 170L : this.timeProvider.time;
        consumer.accept("id1");
        Assert.assertEquals(EXPIRED_UNAVAIL, ctx().emptyValue, ctx().get());
        consumer.accept("id2");
        Assert.assertEquals(UPDATED_UNEXPIRED_AVAIL, ctx().getUpdateExpired, ctx().get());
        this.timeProvider.time = 300L;
        takeAndRestoreSnapshot();
        this.timeProvider.time = z ? 230L : this.timeProvider.time;
        consumer.accept("id1");
        Assert.assertEquals(EXPIRED_UNAVAIL, ctx().emptyValue, ctx().get());
        consumer.accept("id2");
        Assert.assertEquals(EXPIRED_UNAVAIL, ctx().emptyValue, ctx().get());
    }

    @Test
    public void testSnapshotChangeRestore() throws Exception {
        initTest();
        this.timeProvider.time = 0L;
        this.sbetc.setCurrentKey("k1");
        ctx().update(ctx().updateEmpty);
        this.timeProvider.time = 50L;
        this.sbetc.setCurrentKey("k1");
        ctx().update(ctx().updateUnexpired);
        this.timeProvider.time = TTL;
        this.sbetc.setCurrentKey("k2");
        ctx().update(ctx().updateEmpty);
        KeyedStateHandle takeSnapshot = this.sbetc.takeSnapshot();
        this.timeProvider.time = 170L;
        this.sbetc.setCurrentKey("k1");
        ctx().update(ctx().updateExpired);
        this.sbetc.setCurrentKey("k2");
        ctx().update(ctx().updateUnexpired);
        restoreSnapshot(takeSnapshot, 10);
        this.timeProvider.time = 180L;
        this.sbetc.setCurrentKey("k1");
        Assert.assertEquals(EXPIRED_UNAVAIL, ctx().emptyValue, ctx().get());
        this.sbetc.setCurrentKey("k2");
        Assert.assertEquals(UNEXPIRED_AVAIL, ctx().getUpdateEmpty, ctx().get());
    }

    @Test(expected = StateMigrationException.class)
    public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
        Assume.assumeThat(this, CoreMatchers.not(CoreMatchers.instanceOf(MockTtlStateTest.class)));
        initTest();
        this.timeProvider.time = 0L;
        ctx().update(ctx().updateEmpty);
        this.sbetc.createAndRestoreKeyedStateBackend(this.sbetc.takeSnapshot());
        this.sbetc.setCurrentKey("defaultKey");
        this.sbetc.createState(ctx().createStateDescriptor(), "");
    }

    @Test
    public void testIncrementalCleanup() throws Exception {
        Assume.assumeTrue(incrementalCleanupSupported());
        initTest(getConfBuilder(TTL).cleanupIncrementally(5, true).build());
        this.timeProvider.time = 0L;
        updateKeys(0, INC_CLEANUP_ALL_KEYS, ctx().updateEmpty);
        this.timeProvider.time = 50L;
        updateKeys(0, 160, ctx().updateUnexpired);
        RunnableFuture<SnapshotResult<KeyedStateHandle>> triggerSnapshot = this.sbetc.triggerSnapshot();
        updateKeys(160, 320, ctx().updateUnexpired);
        this.timeProvider.time = 120L;
        triggerMoreIncrementalCleanupByOtherOps();
        checkExpiredKeys(320, INC_CLEANUP_ALL_KEYS);
        KeyedStateHandle keyedStateHandle = (KeyedStateHandle) triggerSnapshot.get().getJobManagerOwnedSnapshot();
        this.timeProvider.time = 50L;
        restoreSnapshot(keyedStateHandle, 10);
        checkUnexpiredKeys(160, INC_CLEANUP_ALL_KEYS, ctx().getUpdateEmpty);
        this.timeProvider.time = 120L;
        for (int i = 80; i < 200; i++) {
            this.sbetc.setCurrentKey(Integer.toString(i));
            ctx().ttlState.clear();
        }
        checkUnexpiredKeys(0, 80, ctx().getUnexpired);
        triggerMoreIncrementalCleanupByOtherOps();
        checkExpiredKeys(160, 320);
        this.timeProvider.time = 170L;
        checkExpiredKeys(80, INC_CLEANUP_ALL_KEYS);
        checkExpiredKeys(0, 80);
    }

    private <T> void updateKeys(int i, int i2, T t) throws Exception {
        for (int i3 = i; i3 < i2; i3++) {
            this.sbetc.setCurrentKey(Integer.toString(i3));
            ctx().update(t);
        }
    }

    private <T> void checkUnexpiredKeys(int i, int i2, T t) throws Exception {
        for (int i3 = i; i3 < i2; i3++) {
            this.sbetc.setCurrentKey(Integer.toString(i3));
            Assert.assertEquals(UNEXPIRED_AVAIL, t, ctx().get());
        }
    }

    private void checkExpiredKeys(int i, int i2) throws Exception {
        for (int i3 = i; i3 < i2; i3++) {
            this.sbetc.setCurrentKey(Integer.toString(i3));
            Assert.assertEquals("Original state should be cleared", ctx().emptyValue, ctx().getOriginal());
        }
    }

    private void triggerMoreIncrementalCleanupByOtherOps() throws Exception {
        for (int i = INC_CLEANUP_ALL_KEYS; i < 1940; i++) {
            this.sbetc.setCurrentKey(Integer.toString(i));
            if (i % 2 == 0) {
                ctx().get();
            } else {
                ctx().update(ctx().updateEmpty);
            }
        }
    }

    @After
    public void tearDown() throws Exception {
        this.sbetc.dispose();
    }
}
