package org.apache.flink.streaming.api.operators;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import junit.framework.TestCase;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.class */
public class AbstractStreamOperatorTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest$CustomRawKeyedStateTestOperator.class */
    private static class CustomRawKeyedStateTestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1;
        private final byte[] snapshotBytes;
        private final List<Integer> keyGroupsToWrite;
        private Map<Integer, byte[]> restoredRawKeyedState;

        CustomRawKeyedStateTestOperator(byte[] bArr, List<Integer> list) {
            this.snapshotBytes = Arrays.copyOf(bArr, bArr.length);
            this.keyGroupsToWrite = (List) Preconditions.checkNotNull(list);
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
        }

        protected boolean isUsingCustomRawKeyedState() {
            return true;
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            super.snapshotState(stateSnapshotContext);
            KeyedStateCheckpointOutputStream rawKeyedOperatorStateOutput = stateSnapshotContext.getRawKeyedOperatorStateOutput();
            Iterator<Integer> it = this.keyGroupsToWrite.iterator();
            while (it.hasNext()) {
                rawKeyedOperatorStateOutput.startNewKeyGroup(it.next().intValue());
                rawKeyedOperatorStateOutput.write(this.snapshotBytes);
            }
            rawKeyedOperatorStateOutput.close();
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.restoredRawKeyedState = new HashMap();
            for (KeyGroupStatePartitionStreamProvider keyGroupStatePartitionStreamProvider : stateInitializationContext.getRawKeyedStateInputs()) {
                byte[] bArr = new byte[this.snapshotBytes.length];
                keyGroupStatePartitionStreamProvider.getStream().read(bArr);
                this.restoredRawKeyedState.put(Integer.valueOf(keyGroupStatePartitionStreamProvider.getKeyGroupId()), bArr);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest$TestKeySelector.class */
    public static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> {
        private static final long serialVersionUID = 1;

        public Integer getKey(Tuple2<Integer, String> tuple2) throws Exception {
            return (Integer) tuple2.f0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest$TestOperator.class */
    public static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1;
        private transient InternalTimerService<VoidNamespace> timerService;
        private final ValueStateDescriptor<String> stateDescriptor;

        private TestOperator() {
            this.stateDescriptor = new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
        }

        public void open() throws Exception {
            super.open();
            this.timerService = getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this);
        }

        public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
            String[] split = ((String) ((Tuple2) streamRecord.getValue()).f1).split(":");
            String str = split[0];
            boolean z = -1;
            switch (str.hashCode()) {
                case -1478918947:
                    if (str.equals("DELETE_STATE")) {
                        z = true;
                        break;
                    }
                    break;
                case -1172772417:
                    if (str.equals("SET_PROC_TIME_TIMER")) {
                        z = 3;
                        break;
                    }
                    break;
                case -562919307:
                    if (str.equals("SET_EVENT_TIME_TIMER")) {
                        z = 2;
                        break;
                    }
                    break;
                case 301419781:
                    if (str.equals("EMIT_STATE")) {
                        z = 4;
                        break;
                    }
                    break;
                case 339850804:
                    if (str.equals("SET_STATE")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    getPartitionedState(this.stateDescriptor).update(split[1]);
                    return;
                case true:
                    getPartitionedState(this.stateDescriptor).clear();
                    return;
                case true:
                    this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(split[1]));
                    return;
                case true:
                    this.timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(split[1]));
                    return;
                case true:
                    this.output.collect(new StreamRecord("ON_ELEMENT:" + ((Tuple2) streamRecord.getValue()).f0 + ":" + ((String) getPartitionedState(this.stateDescriptor).value())));
                    return;
                default:
                    throw new IllegalArgumentException();
            }
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            this.output.collect(new StreamRecord("ON_EVENT_TIME:" + ((String) getPartitionedState(this.stateDescriptor).value())));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            this.output.collect(new StreamRecord("ON_PROC_TIME:" + ((String) getPartitionedState(this.stateDescriptor).value())));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest$WatermarkTestingOperator.class */
    private static class WatermarkTestingOperator extends AbstractStreamOperator<Long> implements TwoInputStreamOperator<Long, Long, Long>, Triggerable<Integer, VoidNamespace> {
        private transient InternalTimerService<VoidNamespace> timerService;

        private WatermarkTestingOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.timerService = getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this);
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            this.output.collect(new StreamRecord(Long.valueOf(internalTimer.getTimestamp())));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
        }

        public void processElement1(StreamRecord<Long> streamRecord) throws Exception {
            this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, ((Long) streamRecord.getValue()).longValue());
        }

        public void processElement2(StreamRecord<Long> streamRecord) throws Exception {
            this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, ((Long) streamRecord.getValue()).longValue());
        }
    }

    protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness() throws Exception {
        return createTestHarness(1, 1, 0);
    }

    protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int i, int i2, int i3) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness<>((OneInputStreamOperator) new TestOperator(), (KeySelector) new TestKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO, i, i2, i3);
    }

    protected <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> createTestHarness(int i, int i2, int i3, OneInputStreamOperator<IN, OUT> oneInputStreamOperator, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness<>(oneInputStreamOperator, keySelector, typeInformation, i, i2, i3);
    }

    @Test
    public void testStateDoesNotInterfere() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            createTestHarness.open();
            createTestHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0L);
            createTestHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0L);
            createTestHarness.processElement(new Tuple2<>(1, "EMIT_STATE"), 0L);
            createTestHarness.processElement(new Tuple2<>(0, "EMIT_STATE"), 0L);
            MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"}));
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEventTimeTimersDontInterfere() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            createTestHarness.open();
            createTestHarness.processWatermark(0L);
            createTestHarness.processElement(new Tuple2<>(1, "SET_EVENT_TIME_TIMER:20"), 0L);
            createTestHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0L);
            createTestHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0L);
            createTestHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:10"), 0L);
            createTestHarness.processWatermark(10L);
            MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_EVENT_TIME:HELLO"}));
            createTestHarness.processWatermark(20L);
            MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_EVENT_TIME:CIAO"}));
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testProcessingTimeTimersDontInterfere() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            createTestHarness.open();
            createTestHarness.setProcessingTime(0L);
            createTestHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0L);
            createTestHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0L);
            createTestHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0L);
            createTestHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0L);
            createTestHarness.setProcessingTime(10L);
            MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
            createTestHarness.setProcessingTime(20L);
            MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_PROC_TIME:CIAO"}));
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEnsureProcessingTimeTimerRegisteredOnRestore() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            try {
                createTestHarness.open();
                createTestHarness.setProcessingTime(0L);
                createTestHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0L);
                createTestHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0L);
                createTestHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0L);
                createTestHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0L);
                OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
                createTestHarness = createTestHarness();
                Throwable th3 = null;
                try {
                    try {
                        createTestHarness.setProcessingTime(0L);
                        createTestHarness.setup();
                        createTestHarness.initializeState(snapshot);
                        createTestHarness.open();
                        createTestHarness.setProcessingTime(10L);
                        MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
                        createTestHarness.setProcessingTime(20L);
                        MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_PROC_TIME:CIAO"}));
                        if (createTestHarness != null) {
                            if (0 == 0) {
                                createTestHarness.close();
                                return;
                            }
                            try {
                                createTestHarness.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th3 = th5;
                        throw th5;
                    }
                } finally {
                }
            } catch (Throwable th6) {
                th = th6;
                throw th6;
            }
        } finally {
        }
    }

    @Test
    public void testProcessingTimeAndEventTimeDontInterfere() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            createTestHarness.open();
            createTestHarness.setProcessingTime(0L);
            createTestHarness.processWatermark(0L);
            createTestHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0L);
            createTestHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:20"), 0L);
            createTestHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0L);
            createTestHarness.processWatermark(20L);
            MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_EVENT_TIME:HELLO"}));
            createTestHarness.setProcessingTime(10L);
            MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testStateAndTimerStateShufflingScalingUp() throws Exception {
        OperatorSubtaskState snapshot;
        OperatorSubtaskState repartitionOperatorState;
        Throwable th;
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 4);
        KeyGroupRange keyGroupRange2 = new KeyGroupRange(keyGroupRange.getEndKeyGroup() + 1, 9);
        int keyInKeyGroupRange = getKeyInKeyGroupRange(keyGroupRange, 10);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(keyGroupRange2, 10);
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(10, 1, 0);
        Throwable th2 = null;
        try {
            try {
                createTestHarness.open();
                createTestHarness.processWatermark(0L);
                createTestHarness.setProcessingTime(0L);
                createTestHarness.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange), "SET_EVENT_TIME_TIMER:10"), 0L);
                createTestHarness.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange2), "SET_EVENT_TIME_TIMER:20"), 0L);
                createTestHarness.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange), "SET_PROC_TIME_TIMER:10"), 0L);
                createTestHarness.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange2), "SET_PROC_TIME_TIMER:20"), 0L);
                createTestHarness.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange), "SET_STATE:HELLO"), 0L);
                createTestHarness.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange2), "SET_STATE:CIAO"), 0L);
                TestCase.assertTrue(extractResult(createTestHarness).isEmpty());
                snapshot = createTestHarness.snapshot(0L, 0L);
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
                repartitionOperatorState = AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 10, 1, 2, 0);
                createTestHarness = createTestHarness(10, 2, 0);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    createTestHarness.setup();
                    createTestHarness.initializeState(repartitionOperatorState);
                    createTestHarness.open();
                    createTestHarness.processWatermark(10L);
                    MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_EVENT_TIME:HELLO"}));
                    TestCase.assertTrue(extractResult(createTestHarness).isEmpty());
                    createTestHarness.processWatermark(20L);
                    TestCase.assertTrue(extractResult(createTestHarness).isEmpty());
                    createTestHarness.setProcessingTime(10L);
                    MatcherAssert.assertThat(extractResult(createTestHarness), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
                    TestCase.assertTrue(extractResult(createTestHarness).isEmpty());
                    createTestHarness.setProcessingTime(20L);
                    TestCase.assertTrue(extractResult(createTestHarness).isEmpty());
                    if (createTestHarness != null) {
                        if (0 != 0) {
                            try {
                                createTestHarness.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            createTestHarness.close();
                        }
                    }
                    OperatorSubtaskState repartitionOperatorState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 10, 1, 2, 1);
                    KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness2 = createTestHarness(10, 2, 1);
                    Throwable th6 = null;
                    try {
                        createTestHarness2.setup();
                        createTestHarness2.initializeState(repartitionOperatorState2);
                        createTestHarness2.open();
                        createTestHarness2.processWatermark(10L);
                        TestCase.assertTrue(extractResult(createTestHarness2).isEmpty());
                        createTestHarness2.processWatermark(20L);
                        MatcherAssert.assertThat(extractResult(createTestHarness2), Matchers.contains(new String[]{"ON_EVENT_TIME:CIAO"}));
                        createTestHarness2.setProcessingTime(10L);
                        TestCase.assertTrue(extractResult(createTestHarness2).isEmpty());
                        createTestHarness2.setProcessingTime(20L);
                        MatcherAssert.assertThat(extractResult(createTestHarness2), Matchers.contains(new String[]{"ON_PROC_TIME:CIAO"}));
                        TestCase.assertTrue(extractResult(createTestHarness2).isEmpty());
                        if (createTestHarness2 != null) {
                            if (0 == 0) {
                                createTestHarness2.close();
                                return;
                            }
                            try {
                                createTestHarness2.close();
                            } catch (Throwable th7) {
                                th6.addSuppressed(th7);
                            }
                        }
                    } catch (Throwable th8) {
                        if (createTestHarness2 != null) {
                            if (0 != 0) {
                                try {
                                    createTestHarness2.close();
                                } catch (Throwable th9) {
                                    th6.addSuppressed(th9);
                                }
                            } else {
                                createTestHarness2.close();
                            }
                        }
                        throw th8;
                    }
                } catch (Throwable th10) {
                    th = th10;
                    throw th10;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testStateAndTimerStateShufflingScalingDown() throws Exception {
        OperatorSubtaskState snapshot;
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness;
        Throwable th;
        Throwable th2;
        OperatorSubtaskState repartitionOperatorState;
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 4);
        KeyGroupRange keyGroupRange2 = new KeyGroupRange(keyGroupRange.getEndKeyGroup() + 1, 9);
        int keyInKeyGroupRange = getKeyInKeyGroupRange(keyGroupRange, 10);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(keyGroupRange2, 10);
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness2 = createTestHarness(10, 2, 0);
        Throwable th3 = null;
        try {
            try {
                createTestHarness2.setup();
                createTestHarness2.open();
                createTestHarness2.processWatermark(0L);
                createTestHarness2.setProcessingTime(0L);
                createTestHarness2.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange), "SET_EVENT_TIME_TIMER:30"), 0L);
                createTestHarness2.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange), "SET_PROC_TIME_TIMER:30"), 0L);
                createTestHarness2.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange), "SET_STATE:HELLO"), 0L);
                snapshot = createTestHarness2.snapshot(0L, 0L);
                if (createTestHarness2 != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        createTestHarness2.close();
                    }
                }
                createTestHarness = createTestHarness(10, 2, 1);
                th = null;
            } catch (Throwable th5) {
                th3 = th5;
                throw th5;
            }
            try {
                try {
                    createTestHarness.setup();
                    createTestHarness.open();
                    createTestHarness.processWatermark(0L);
                    createTestHarness.setProcessingTime(0L);
                    createTestHarness.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange2), "SET_EVENT_TIME_TIMER:40"), 0L);
                    createTestHarness.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange2), "SET_PROC_TIME_TIMER:40"), 0L);
                    createTestHarness.processElement(new Tuple2<>(Integer.valueOf(keyInKeyGroupRange2), "SET_STATE:CIAO"), 0L);
                    OperatorSubtaskState snapshot2 = createTestHarness.snapshot(0L, 0L);
                    if (createTestHarness != null) {
                        if (0 != 0) {
                            try {
                                createTestHarness.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            createTestHarness.close();
                        }
                    }
                    repartitionOperatorState = AbstractStreamOperatorTestHarness.repartitionOperatorState(AbstractStreamOperatorTestHarness.repackageState(snapshot, snapshot2), 10, 2, 1, 0);
                    createTestHarness2 = createTestHarness(10, 1, 0);
                    th2 = null;
                } catch (Throwable th7) {
                    th = th7;
                    throw th7;
                }
                try {
                    try {
                        createTestHarness2.setup();
                        createTestHarness2.initializeState(repartitionOperatorState);
                        createTestHarness2.open();
                        createTestHarness2.processWatermark(30L);
                        MatcherAssert.assertThat(extractResult(createTestHarness2), Matchers.contains(new String[]{"ON_EVENT_TIME:HELLO"}));
                        TestCase.assertTrue(extractResult(createTestHarness2).isEmpty());
                        createTestHarness2.processWatermark(40L);
                        MatcherAssert.assertThat(extractResult(createTestHarness2), Matchers.contains(new String[]{"ON_EVENT_TIME:CIAO"}));
                        TestCase.assertTrue(extractResult(createTestHarness2).isEmpty());
                        createTestHarness2.setProcessingTime(30L);
                        MatcherAssert.assertThat(extractResult(createTestHarness2), Matchers.contains(new String[]{"ON_PROC_TIME:HELLO"}));
                        TestCase.assertTrue(extractResult(createTestHarness2).isEmpty());
                        createTestHarness2.setProcessingTime(40L);
                        MatcherAssert.assertThat(extractResult(createTestHarness2), Matchers.contains(new String[]{"ON_PROC_TIME:CIAO"}));
                        TestCase.assertTrue(extractResult(createTestHarness2).isEmpty());
                        if (createTestHarness2 != null) {
                            if (0 == 0) {
                                createTestHarness2.close();
                                return;
                            }
                            try {
                                createTestHarness2.close();
                            } catch (Throwable th8) {
                                th2.addSuppressed(th8);
                            }
                        }
                    } catch (Throwable th9) {
                        th2 = th9;
                        throw th9;
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            if (createTestHarness2 != null) {
                if (th3 != null) {
                    try {
                        createTestHarness2.close();
                    } catch (Throwable th10) {
                        th3.addSuppressed(th10);
                    }
                } else {
                    createTestHarness2.close();
                }
            }
        }
    }

    @Test
    public void testCustomRawKeyedStateSnapshotAndRestore() throws Exception {
        List asList = Arrays.asList(2, 3, 8);
        byte[] bytes = "TEST".getBytes();
        CustomRawKeyedStateTestOperator customRawKeyedStateTestOperator = new CustomRawKeyedStateTestOperator(bytes, asList);
        KeyedOneInputStreamOperatorTestHarness createTestHarness = createTestHarness(10, 1, 0, customRawKeyedStateTestOperator, str -> {
            return str;
        }, BasicTypeInfo.STRING_TYPE_INFO);
        Throwable th = null;
        try {
            try {
                createTestHarness.setup();
                createTestHarness.open();
                OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
                KeyedOneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(10, 1, 0, customRawKeyedStateTestOperator, str2 -> {
                    return str2;
                }, BasicTypeInfo.STRING_TYPE_INFO);
                Throwable th3 = null;
                try {
                    createTestHarness2.setup();
                    createTestHarness2.initializeState(snapshot);
                    createTestHarness2.open();
                    if (createTestHarness2 != null) {
                        if (0 != 0) {
                            try {
                                createTestHarness2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            createTestHarness2.close();
                        }
                    }
                    MatcherAssert.assertThat(customRawKeyedStateTestOperator.restoredRawKeyedState, hasRestoredKeyGroupsWith(bytes, asList));
                } catch (Throwable th5) {
                    if (createTestHarness2 != null) {
                        if (0 != 0) {
                            try {
                                createTestHarness2.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            createTestHarness2.close();
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (Throwable th7) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th7;
        }
    }

    @Test
    public void testIdleWatermarkHandling() throws Exception {
        WatermarkTestingOperator watermarkTestingOperator = new WatermarkTestingOperator();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeySelector keySelector = l -> {
            return 0;
        };
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(watermarkTestingOperator, keySelector, keySelector, BasicTypeInfo.INT_TYPE_INFO);
        Throwable th = null;
        try {
            try {
                keyedTwoInputStreamOperatorTestHarness.setup();
                keyedTwoInputStreamOperatorTestHarness.open();
                keyedTwoInputStreamOperatorTestHarness.processElement1(1L, 1L);
                keyedTwoInputStreamOperatorTestHarness.processElement1(3L, 3L);
                keyedTwoInputStreamOperatorTestHarness.processElement1(4L, 4L);
                keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(1L));
                MatcherAssert.assertThat(keyedTwoInputStreamOperatorTestHarness.getOutput(), Matchers.empty());
                keyedTwoInputStreamOperatorTestHarness.processWatermarkStatus2(WatermarkStatus.IDLE);
                concurrentLinkedQueue.add(new StreamRecord(1L));
                concurrentLinkedQueue.add(new Watermark(1L));
                TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
                keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(3L));
                concurrentLinkedQueue.add(new StreamRecord(3L));
                concurrentLinkedQueue.add(new Watermark(3L));
                TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
                keyedTwoInputStreamOperatorTestHarness.processWatermarkStatus2(WatermarkStatus.ACTIVE);
                keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(4L));
                TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
                if (keyedTwoInputStreamOperatorTestHarness != null) {
                    if (0 == 0) {
                        keyedTwoInputStreamOperatorTestHarness.close();
                        return;
                    }
                    try {
                        keyedTwoInputStreamOperatorTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (keyedTwoInputStreamOperatorTestHarness != null) {
                if (th != null) {
                    try {
                        keyedTwoInputStreamOperatorTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    keyedTwoInputStreamOperatorTestHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testIdlenessForwarding() throws Exception {
        WatermarkTestingOperator watermarkTestingOperator = new WatermarkTestingOperator();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeySelector keySelector = l -> {
            return 0;
        };
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(watermarkTestingOperator, keySelector, keySelector, BasicTypeInfo.INT_TYPE_INFO);
        Throwable th = null;
        try {
            try {
                keyedTwoInputStreamOperatorTestHarness.setup();
                keyedTwoInputStreamOperatorTestHarness.open();
                keyedTwoInputStreamOperatorTestHarness.processWatermarkStatus1(WatermarkStatus.IDLE);
                keyedTwoInputStreamOperatorTestHarness.processWatermarkStatus2(WatermarkStatus.IDLE);
                concurrentLinkedQueue.add(WatermarkStatus.IDLE);
                TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
                if (keyedTwoInputStreamOperatorTestHarness != null) {
                    if (0 == 0) {
                        keyedTwoInputStreamOperatorTestHarness.close();
                        return;
                    }
                    try {
                        keyedTwoInputStreamOperatorTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (keyedTwoInputStreamOperatorTestHarness != null) {
                if (th != null) {
                    try {
                        keyedTwoInputStreamOperatorTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    keyedTwoInputStreamOperatorTestHarness.close();
                }
            }
            throw th4;
        }
    }

    private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> oneInputStreamOperatorTestHarness) {
        List<StreamRecord<? extends T>> extractOutputStreamRecords = oneInputStreamOperatorTestHarness.extractOutputStreamRecords();
        ArrayList arrayList = new ArrayList();
        for (StreamRecord<? extends T> streamRecord : extractOutputStreamRecords) {
            if (streamRecord instanceof StreamRecord) {
                arrayList.add(streamRecord.getValue());
            }
        }
        oneInputStreamOperatorTestHarness.getOutput().clear();
        return arrayList;
    }

    private static int getKeyInKeyGroupRange(KeyGroupRange keyGroupRange, int i) {
        Random random = new Random(System.currentTimeMillis());
        int nextInt = random.nextInt();
        while (true) {
            int i2 = nextInt;
            if (keyGroupRange.contains(KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i2), i))) {
                return i2;
            }
            nextInt = random.nextInt();
        }
    }

    private static Matcher<Map<Integer, byte[]>> hasRestoredKeyGroupsWith(final byte[] bArr, final List<Integer> list) {
        return new TypeSafeMatcher<Map<Integer, byte[]>>() { // from class: org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(Map<Integer, byte[]> map) {
                if (map.size() != list.size()) {
                    return false;
                }
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    if (!Arrays.equals(map.get(Integer.valueOf(((Integer) it.next()).intValue())), bArr)) {
                        return false;
                    }
                }
                return true;
            }

            public void describeTo(Description description) {
                description.appendText("Key groups: " + list + " with snapshot data " + Arrays.toString(bArr));
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1754167214:
                if (implMethodName.equals("lambda$testCustomRawKeyedStateSnapshotAndRestore$aeea360d$1")) {
                    z = true;
                    break;
                }
                break;
            case -1754167213:
                if (implMethodName.equals("lambda$testCustomRawKeyedStateSnapshotAndRestore$aeea360d$2")) {
                    z = false;
                    break;
                }
                break;
            case 718303944:
                if (implMethodName.equals("lambda$testIdleWatermarkHandling$bf5bd099$1")) {
                    z = 2;
                    break;
                }
                break;
            case 780924631:
                if (implMethodName.equals("lambda$testIdlenessForwarding$bf5bd099$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str2 -> {
                        return str2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Integer;")) {
                    return l -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Integer;")) {
                    return l2 -> {
                        return 0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
