package org.apache.flink.streaming.api.functions;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.class */
public class StatefulSequenceSourceTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest$BlockingSourceContext.class */
    private static class BlockingSourceContext implements SourceFunction.SourceContext<Long> {
        private final String name;
        private final OneShotLatch latchToTrigger;
        private final OneShotLatch latchToWait;
        private final ConcurrentHashMap<String, List<Long>> collector;
        private final int threshold;
        private int counter = 0;
        private final Object lock = new Object();
        private final List<Long> localOutput = new ArrayList();

        public BlockingSourceContext(String str, OneShotLatch oneShotLatch, OneShotLatch oneShotLatch2, ConcurrentHashMap<String, List<Long>> concurrentHashMap, int i) {
            this.name = str;
            this.latchToTrigger = oneShotLatch;
            this.latchToWait = oneShotLatch2;
            this.collector = concurrentHashMap;
            this.threshold = i;
            if (this.collector.put(str, this.localOutput) != null) {
                Assert.fail();
            }
        }

        public void collectWithTimestamp(Long l, long j) {
            collect(l);
        }

        public void collect(Long l) {
            this.localOutput.add(l);
            int i = this.counter + 1;
            this.counter = i;
            if (i == this.threshold) {
                this.latchToTrigger.trigger();
                try {
                    if (!this.latchToWait.isTriggered()) {
                        this.latchToWait.await();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void emitWatermark(Watermark watermark) {
            throw new UnsupportedOperationException();
        }

        public void markAsTemporarilyIdle() {
            throw new UnsupportedOperationException();
        }

        public Object getCheckpointLock() {
            return this.lock;
        }

        public void close() {
        }
    }

    @Test
    public void testCheckpointRestore() throws Exception {
        HashSet hashSet = new HashSet();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 > 100) {
                break;
            }
            hashSet.add(Long.valueOf(j2));
            j = j2 + 1;
        }
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final OneShotLatch oneShotLatch2 = new OneShotLatch();
        final OneShotLatch oneShotLatch3 = new OneShotLatch();
        final OneShotLatch oneShotLatch4 = new OneShotLatch();
        final StatefulSequenceSource statefulSequenceSource = new StatefulSequenceSource(0L, 100L);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator) new StreamSource(statefulSequenceSource), 2, 2, 0);
        abstractStreamOperatorTestHarness.open();
        final StatefulSequenceSource statefulSequenceSource2 = new StatefulSequenceSource(0L, 100L);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness2 = new AbstractStreamOperatorTestHarness((StreamOperator) new StreamSource(statefulSequenceSource2), 2, 2, 1);
        abstractStreamOperatorTestHarness2.open();
        final Throwable[] thArr = new Throwable[3];
        Thread thread = new Thread() { // from class: org.apache.flink.streaming.api.functions.StatefulSequenceSourceTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    statefulSequenceSource.run(new BlockingSourceContext("1", oneShotLatch, oneShotLatch2, concurrentHashMap, 21));
                } catch (Throwable th) {
                    th.printStackTrace();
                    thArr[0] = th;
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.flink.streaming.api.functions.StatefulSequenceSourceTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    statefulSequenceSource2.run(new BlockingSourceContext("2", oneShotLatch3, oneShotLatch4, concurrentHashMap, 32));
                } catch (Throwable th) {
                    th.printStackTrace();
                    thArr[1] = th;
                }
            }
        };
        thread.start();
        thread2.start();
        if (!oneShotLatch.isTriggered()) {
            oneShotLatch.await();
        }
        if (!oneShotLatch3.isTriggered()) {
            oneShotLatch3.await();
        }
        OperatorSubtaskState repackageState = AbstractStreamOperatorTestHarness.repackageState(abstractStreamOperatorTestHarness.snapshot(0L, 0L), abstractStreamOperatorTestHarness2.snapshot(0L, 0L));
        final StatefulSequenceSource statefulSequenceSource3 = new StatefulSequenceSource(0L, 100L);
        StreamSource streamSource = new StreamSource(statefulSequenceSource3);
        OperatorSubtaskState repartitionOperatorState = AbstractStreamOperatorTestHarness.repartitionOperatorState(repackageState, 2, 2, 1, 0);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness3 = new AbstractStreamOperatorTestHarness((StreamOperator) streamSource, 2, 1, 0);
        abstractStreamOperatorTestHarness3.setup();
        abstractStreamOperatorTestHarness3.initializeState(repartitionOperatorState);
        abstractStreamOperatorTestHarness3.open();
        final OneShotLatch oneShotLatch5 = new OneShotLatch();
        final OneShotLatch oneShotLatch6 = new OneShotLatch();
        oneShotLatch6.trigger();
        Thread thread3 = new Thread() { // from class: org.apache.flink.streaming.api.functions.StatefulSequenceSourceTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    statefulSequenceSource3.run(new BlockingSourceContext("3", oneShotLatch5, oneShotLatch6, concurrentHashMap, 3));
                } catch (Throwable th) {
                    th.printStackTrace();
                    thArr[2] = th;
                }
            }
        };
        thread3.start();
        thread3.join();
        Assert.assertEquals(3L, concurrentHashMap.size());
        HashSet hashSet2 = new HashSet(Math.abs(100) + 1);
        Iterator it = concurrentHashMap.entrySet().iterator();
        while (it.hasNext()) {
            List<Long> list = (List) concurrentHashMap.get((String) ((Map.Entry) it.next()).getKey());
            Assert.assertTrue(list.size() > 0);
            for (Long l : list) {
                if (!hashSet2.add(l)) {
                    Assert.fail("Duplicate entry: " + l);
                }
                if (!hashSet.contains(l)) {
                    Assert.fail("Unexpected element: " + l);
                }
            }
        }
        Assert.assertEquals(Math.abs(-100) + 1, hashSet2.size());
        oneShotLatch2.trigger();
        oneShotLatch4.trigger();
        thread.join();
        thread2.join();
    }
}
