package org.apache.flink.streaming.api.operators.sorted.state;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceTest.class */
public class BatchExecutionInternalTimeServiceTest extends TestLogger {
    public static final IntSerializer KEY_SERIALIZER = new IntSerializer();

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceTest$DummyKeyContext.class */
    private static class DummyKeyContext implements KeyContext {
        private DummyKeyContext() {
        }

        public void setCurrentKey(Object obj) {
        }

        public Object getCurrentKey() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceTest$LambdaTrigger.class */
    private static class LambdaTrigger<K, N> implements Triggerable<K, N> {
        private final Consumer<InternalTimer<K, N>> eventTimeHandler;
        private final Consumer<InternalTimer<K, N>> processingTimeHandler;

        public static <K, N> LambdaTrigger<K, N> eventTimeTrigger(Consumer<InternalTimer<K, N>> consumer) {
            return new LambdaTrigger<>(consumer, internalTimer -> {
                Assert.fail("We did not expect processing timer to be triggered.");
            });
        }

        public static <K, N> LambdaTrigger<K, N> processingTimeTrigger(Consumer<InternalTimer<K, N>> consumer) {
            return new LambdaTrigger<>(internalTimer -> {
                Assert.fail("We did not expect event timer to be triggered.");
            }, consumer);
        }

        private LambdaTrigger(Consumer<InternalTimer<K, N>> consumer, Consumer<InternalTimer<K, N>> consumer2) {
            this.eventTimeHandler = consumer;
            this.processingTimeHandler = consumer2;
        }

        public void onEventTime(InternalTimer<K, N> internalTimer) throws Exception {
            this.eventTimeHandler.accept(internalTimer);
        }

        public void onProcessingTime(InternalTimer<K, N> internalTimer) throws Exception {
            this.processingTimeHandler.accept(internalTimer);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceTest$TriggerWithTimerServiceAccess.class */
    private static class TriggerWithTimerServiceAccess<K, N> implements Triggerable<K, N> {
        private InternalTimerService<N> timerService;
        private final BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> eventTimeHandler;
        private final BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> processingTimeHandler;

        private TriggerWithTimerServiceAccess(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> biConsumer, BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> biConsumer2) {
            this.eventTimeHandler = biConsumer;
            this.processingTimeHandler = biConsumer2;
        }

        public static <K, N> TriggerWithTimerServiceAccess<K, N> eventTimeTrigger(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> biConsumer) {
            return new TriggerWithTimerServiceAccess<>(biConsumer, (internalTimer, internalTimerService) -> {
                Assert.fail("We did not expect processing timer to be triggered.");
            });
        }

        public static <K, N> TriggerWithTimerServiceAccess<K, N> processingTimeTrigger(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> biConsumer) {
            return new TriggerWithTimerServiceAccess<>((internalTimer, internalTimerService) -> {
                Assert.fail("We did not expect event timer to be triggered.");
            }, biConsumer);
        }

        public void setTimerService(InternalTimerService<N> internalTimerService) {
            this.timerService = internalTimerService;
        }

        public void onEventTime(InternalTimer<K, N> internalTimer) throws Exception {
            this.eventTimeHandler.accept(internalTimer, this.timerService);
        }

        public void onProcessingTime(InternalTimer<K, N> internalTimer) throws Exception {
            this.processingTimeHandler.accept(internalTimer, this.timerService);
        }
    }

    @Test
    public void testBatchExecutionManagerCanBeInstantiatedWithBatchStateBackend() throws Exception {
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Batch execution specific time service can work only with BatchExecutionKeyedStateBackend");
        MockEnvironment build = MockEnvironment.builder().build();
        BatchExecutionInternalTimeServiceManager.create(new MemoryStateBackend().createKeyedStateBackend(build, new JobID(), "dummy", KEY_SERIALIZER, 2, new KeyGroupRange(0, 1), build.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry()), getClass().getClassLoader(), new DummyKeyContext(), new TestProcessingTimeService(), Collections.emptyList(), StreamTaskCancellationContext.alwaysRunning());
    }

    @Test
    public void testForEachEventTimeTimerUnsupported() {
        this.expectedException.expect(UnsupportedOperationException.class);
        this.expectedException.expectMessage("The BatchExecutionInternalTimeService should not be used in State Processor API");
        new BatchExecutionInternalTimeService(new TestProcessingTimeService(), LambdaTrigger.eventTimeTrigger(internalTimer -> {
        })).forEachEventTimeTimer((obj, l) -> {
            Assert.fail("The forEachEventTimeTimer() should not be supported");
        });
    }

    @Test
    public void testForEachProcessingTimeTimerUnsupported() {
        this.expectedException.expect(UnsupportedOperationException.class);
        this.expectedException.expectMessage("The BatchExecutionInternalTimeService should not be used in State Processor API");
        new BatchExecutionInternalTimeService(new TestProcessingTimeService(), LambdaTrigger.eventTimeTrigger(internalTimer -> {
        })).forEachEventTimeTimer((obj, l) -> {
            Assert.fail("The forEachProcessingTimeTimer() should not be supported");
        });
    }

    @Test
    public void testFiringEventTimeTimers() throws Exception {
        BatchExecutionKeyedStateBackend batchExecutionKeyedStateBackend = new BatchExecutionKeyedStateBackend(KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        InternalTimeServiceManager create = BatchExecutionInternalTimeServiceManager.create(batchExecutionKeyedStateBackend, getClass().getClassLoader(), new DummyKeyContext(), new TestProcessingTimeService(), Collections.emptyList(), StreamTaskCancellationContext.alwaysRunning());
        ArrayList arrayList = new ArrayList();
        InternalTimerService internalTimerService = create.getInternalTimerService("test", KEY_SERIALIZER, new VoidNamespaceSerializer(), LambdaTrigger.eventTimeTrigger(internalTimer -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
        }));
        batchExecutionKeyedStateBackend.setCurrentKey(1);
        internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 123L);
        create.advanceWatermark(new Watermark(1000L));
        internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, 123L);
        internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 150L);
        batchExecutionKeyedStateBackend.setCurrentKey(2);
        Assert.assertThat(arrayList, CoreMatchers.equalTo(Collections.singletonList(150L)));
    }

    @Test
    public void testSettingSameKeyDoesNotFireTimers() {
        BatchExecutionKeyedStateBackend batchExecutionKeyedStateBackend = new BatchExecutionKeyedStateBackend(KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        InternalTimeServiceManager create = BatchExecutionInternalTimeServiceManager.create(batchExecutionKeyedStateBackend, getClass().getClassLoader(), new DummyKeyContext(), new TestProcessingTimeService(), Collections.emptyList(), StreamTaskCancellationContext.alwaysRunning());
        ArrayList arrayList = new ArrayList();
        InternalTimerService internalTimerService = create.getInternalTimerService("test", KEY_SERIALIZER, new VoidNamespaceSerializer(), LambdaTrigger.eventTimeTrigger(internalTimer -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
        }));
        batchExecutionKeyedStateBackend.setCurrentKey(1);
        internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 123L);
        batchExecutionKeyedStateBackend.setCurrentKey(1);
        Assert.assertThat(arrayList, CoreMatchers.equalTo(Collections.emptyList()));
    }

    @Test
    public void testCurrentWatermark() throws Exception {
        BatchExecutionKeyedStateBackend batchExecutionKeyedStateBackend = new BatchExecutionKeyedStateBackend(KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        InternalTimeServiceManager create = BatchExecutionInternalTimeServiceManager.create(batchExecutionKeyedStateBackend, getClass().getClassLoader(), new DummyKeyContext(), new TestProcessingTimeService(), Collections.emptyList(), StreamTaskCancellationContext.alwaysRunning());
        ArrayList arrayList = new ArrayList();
        TriggerWithTimerServiceAccess eventTimeTrigger = TriggerWithTimerServiceAccess.eventTimeTrigger((internalTimer, internalTimerService) -> {
            Assert.assertThat(Long.valueOf(internalTimerService.currentWatermark()), CoreMatchers.equalTo(Long.MAX_VALUE));
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
        });
        InternalTimerService internalTimerService2 = create.getInternalTimerService("test", KEY_SERIALIZER, new VoidNamespaceSerializer(), eventTimeTrigger);
        eventTimeTrigger.setTimerService(internalTimerService2);
        Assert.assertThat(Long.valueOf(internalTimerService2.currentWatermark()), CoreMatchers.equalTo(Long.MIN_VALUE));
        batchExecutionKeyedStateBackend.setCurrentKey(1);
        internalTimerService2.registerEventTimeTimer(VoidNamespace.INSTANCE, 123L);
        Assert.assertThat(Long.valueOf(internalTimerService2.currentWatermark()), CoreMatchers.equalTo(Long.MIN_VALUE));
        create.advanceWatermark(new Watermark(1000L));
        Assert.assertThat(Long.valueOf(internalTimerService2.currentWatermark()), CoreMatchers.equalTo(Long.MIN_VALUE));
        batchExecutionKeyedStateBackend.setCurrentKey(2);
        Assert.assertThat(Long.valueOf(internalTimerService2.currentWatermark()), CoreMatchers.equalTo(Long.MIN_VALUE));
        internalTimerService2.registerEventTimeTimer(VoidNamespace.INSTANCE, 124L);
        create.advanceWatermark(Watermark.MAX_WATERMARK);
        Assert.assertThat(arrayList, CoreMatchers.equalTo(Arrays.asList(123L, 124L)));
    }

    @Test
    public void testProcessingTimeTimers() {
        BatchExecutionKeyedStateBackend batchExecutionKeyedStateBackend = new BatchExecutionKeyedStateBackend(KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimeServiceManager create = BatchExecutionInternalTimeServiceManager.create(batchExecutionKeyedStateBackend, getClass().getClassLoader(), new DummyKeyContext(), testProcessingTimeService, Collections.emptyList(), StreamTaskCancellationContext.alwaysRunning());
        ArrayList arrayList = new ArrayList();
        InternalTimerService internalTimerService = create.getInternalTimerService("test", KEY_SERIALIZER, new VoidNamespaceSerializer(), LambdaTrigger.processingTimeTrigger(internalTimer -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
        }));
        batchExecutionKeyedStateBackend.setCurrentKey(1);
        internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, 150L);
        Assert.assertThat(Integer.valueOf(testProcessingTimeService.getNumActiveTimers()), CoreMatchers.equalTo(0));
        batchExecutionKeyedStateBackend.setCurrentKey(2);
        Assert.assertThat(arrayList, CoreMatchers.equalTo(Collections.singletonList(150L)));
    }

    @Test
    public void testIgnoringEventTimeTimersFromWithinCallback() {
        BatchExecutionKeyedStateBackend batchExecutionKeyedStateBackend = new BatchExecutionKeyedStateBackend(KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimeServiceManager create = BatchExecutionInternalTimeServiceManager.create(batchExecutionKeyedStateBackend, getClass().getClassLoader(), new DummyKeyContext(), testProcessingTimeService, Collections.emptyList(), StreamTaskCancellationContext.alwaysRunning());
        ArrayList arrayList = new ArrayList();
        TriggerWithTimerServiceAccess eventTimeTrigger = TriggerWithTimerServiceAccess.eventTimeTrigger((internalTimer, internalTimerService) -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
            internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, internalTimer.getTimestamp() + 20);
        });
        InternalTimerService internalTimerService2 = create.getInternalTimerService("test", KEY_SERIALIZER, new VoidNamespaceSerializer(), eventTimeTrigger);
        eventTimeTrigger.setTimerService(internalTimerService2);
        batchExecutionKeyedStateBackend.setCurrentKey(1);
        internalTimerService2.registerEventTimeTimer(VoidNamespace.INSTANCE, 150L);
        Assert.assertThat(Integer.valueOf(testProcessingTimeService.getNumActiveTimers()), CoreMatchers.equalTo(0));
        batchExecutionKeyedStateBackend.setCurrentKey(2);
        Assert.assertThat(arrayList, CoreMatchers.equalTo(Collections.singletonList(150L)));
    }

    @Test
    public void testIgnoringProcessingTimeTimersFromWithinCallback() {
        BatchExecutionKeyedStateBackend batchExecutionKeyedStateBackend = new BatchExecutionKeyedStateBackend(KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimeServiceManager create = BatchExecutionInternalTimeServiceManager.create(batchExecutionKeyedStateBackend, getClass().getClassLoader(), new DummyKeyContext(), testProcessingTimeService, Collections.emptyList(), StreamTaskCancellationContext.alwaysRunning());
        ArrayList arrayList = new ArrayList();
        TriggerWithTimerServiceAccess processingTimeTrigger = TriggerWithTimerServiceAccess.processingTimeTrigger((internalTimer, internalTimerService) -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
            internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, internalTimer.getTimestamp() + 20);
        });
        InternalTimerService internalTimerService2 = create.getInternalTimerService("test", KEY_SERIALIZER, new VoidNamespaceSerializer(), processingTimeTrigger);
        processingTimeTrigger.setTimerService(internalTimerService2);
        batchExecutionKeyedStateBackend.setCurrentKey(1);
        internalTimerService2.registerProcessingTimeTimer(VoidNamespace.INSTANCE, 150L);
        Assert.assertThat(Integer.valueOf(testProcessingTimeService.getNumActiveTimers()), CoreMatchers.equalTo(0));
        batchExecutionKeyedStateBackend.setCurrentKey(2);
        Assert.assertThat(arrayList, CoreMatchers.equalTo(Collections.singletonList(150L)));
    }
}
