package org.apache.flink.table.runtime.operators.wmassigners;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.class */
public class WatermarkAssignerOperatorTest extends WatermarkAssignerOperatorTestBase {
    private static final WatermarkGenerator WATERMARK_GENERATOR = new BoundedOutOfOrderWatermarkGenerator(0, 1);

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest$MyWatermarkGenerator.class */
    private static final class MyWatermarkGenerator extends WatermarkGenerator {
        private static final long serialVersionUID = 1;
        private static boolean openCalled = false;
        private static boolean closeCalled = false;
        private final int watermarkFieldIndex;

        private MyWatermarkGenerator(int i) {
            this.watermarkFieldIndex = i;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            if (closeCalled) {
                Assertions.fail("Close called before open.");
            }
            openCalled = true;
        }

        @Nullable
        public Long currentWatermark(RowData rowData) throws Exception {
            if (!openCalled) {
                Assertions.fail("Open was not called before run.");
            }
            if (rowData.isNullAt(this.watermarkFieldIndex)) {
                return null;
            }
            return Long.valueOf(rowData.getLong(this.watermarkFieldIndex));
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assertions.fail("Open was not called before close.");
            }
            closeCalled = true;
        }
    }

    @Test
    public void testWatermarkAssignerWithIdleSource() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(0, WATERMARK_GENERATOR, 1000L);
        createTestHarness.getExecutionConfig().setAutoWatermarkInterval(50L);
        createTestHarness.open();
        ConcurrentLinkedQueue output = createTestHarness.getOutput();
        ArrayList arrayList = new ArrayList();
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{1L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{2L})));
        createTestHarness.processWatermark(new Watermark(2L));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{3L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{4L})));
        createTestHarness.setProcessingTime(51L);
        arrayList.add(new Watermark(3L));
        Assertions.assertThat(filterOutRecords(output)).isEqualTo(arrayList);
        createTestHarness.setProcessingTime(1001L);
        arrayList.add(WatermarkStatus.IDLE);
        Assertions.assertThat(filterOutRecords(output)).isEqualTo(arrayList);
        arrayList.add(WatermarkStatus.ACTIVE);
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{4L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{5L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{6L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{7L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{8L})));
        createTestHarness.setProcessingTime(1060L);
        arrayList.add(new Watermark(7L));
        Assertions.assertThat(filterOutRecords(output)).isEqualTo(arrayList);
    }

    @Test
    public void testWatermarkAssignerOperator() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(0, WATERMARK_GENERATOR, -1L);
        createTestHarness.getExecutionConfig().setAutoWatermarkInterval(50L);
        long j = 0;
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{1L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{2L})));
        createTestHarness.processWatermark(new Watermark(2L));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{3L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{4L})));
        ConcurrentLinkedQueue output = createTestHarness.getOutput();
        long j2 = 1;
        long j3 = -1;
        while (j3 < 3) {
            if (output.size() > 0) {
                Object poll = output.poll();
                Assertions.assertThat(poll).isNotNull();
                Tuple2<Long, Long> validateElement = validateElement(poll, j2, j3);
                j2 = ((Long) validateElement.f0).longValue();
                j3 = ((Long) validateElement.f1).longValue();
                Assertions.assertThat(j3).isLessThan(j2);
            } else {
                j += 10;
                createTestHarness.setProcessingTime(j);
            }
        }
        output.clear();
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{4L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{5L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{6L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{7L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{8L})));
        ConcurrentLinkedQueue output2 = createTestHarness.getOutput();
        long j4 = 4;
        long j5 = 2;
        while (j5 < 7) {
            if (output2.size() > 0) {
                Object poll2 = output2.poll();
                Assertions.assertThat(poll2).isNotNull();
                Tuple2<Long, Long> validateElement2 = validateElement(poll2, j4, j5);
                j4 = ((Long) validateElement2.f0).longValue();
                j5 = ((Long) validateElement2.f1).longValue();
                Assertions.assertThat(j5).isLessThan(j4);
            } else {
                j += 10;
                createTestHarness.setProcessingTime(j);
            }
        }
        output2.clear();
        createTestHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        Assertions.assertThat(((Watermark) createTestHarness.getOutput().poll()).getTimestamp()).isEqualTo(Long.MAX_VALUE);
    }

    @Test
    public void testCustomizedWatermarkGenerator() throws Exception {
        boolean unused = MyWatermarkGenerator.openCalled = false;
        boolean unused2 = MyWatermarkGenerator.closeCalled = false;
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(0, new MyWatermarkGenerator(1), -1L);
        createTestHarness.getExecutionConfig().setAutoWatermarkInterval(5L);
        ArrayList arrayList = new ArrayList();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{1L, 0L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{2L, 1L})));
        createTestHarness.processWatermark(new Watermark(2L));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{3L, 1L})));
        long j = 0 + 5;
        createTestHarness.setProcessingTime(j);
        arrayList.add(new Watermark(1L));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{4L, 2L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{2L, 1L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{1L, 0L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{6L, null})));
        long j2 = j + 5;
        createTestHarness.setProcessingTime(j2);
        arrayList.add(new Watermark(2L));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{9L, 8L})));
        arrayList.add(new Watermark(8L));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{8L, 7L})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{10L, null})));
        createTestHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{11L, 10L})));
        createTestHarness.setProcessingTime(j2 + 5);
        arrayList.add(new Watermark(10L));
        createTestHarness.close();
        arrayList.add(Watermark.MAX_WATERMARK);
        Assertions.assertThat(createTestHarness.getOutput()).hasSize(arrayList.size() + 11);
        Assertions.assertThat(extractWatermarks(createTestHarness.getOutput())).isEqualTo(arrayList);
        Assertions.assertThat(MyWatermarkGenerator.openCalled).isTrue();
        Assertions.assertThat(MyWatermarkGenerator.closeCalled).isTrue();
    }

    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(int i, final WatermarkGenerator watermarkGenerator, long j) throws Exception {
        return new OneInputStreamOperatorTestHarness<>(new WatermarkAssignerOperatorFactory(i, j, new GeneratedWatermarkGenerator(watermarkGenerator.getClass().getName(), "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.1
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public WatermarkGenerator m105newInstance(ClassLoader classLoader) {
                return watermarkGenerator;
            }

            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public WatermarkGenerator m104newInstance(ClassLoader classLoader, Object... objArr) {
                return watermarkGenerator;
            }
        }));
    }
}
