package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.io.IOException;
import java.util.List;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.class */
public class FlinkSourceSplitEnumeratorTest {
    @Test
    public void testAssignSplitsWithBoundedSource() throws IOException {
        TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(2);
        assignSplits(testingSplitEnumeratorContext, new TestBoundedCountingSource(10, 10), 10);
        Assert.assertEquals(2L, testingSplitEnumeratorContext.getSplitAssignments().size());
        testingSplitEnumeratorContext.getSplitAssignments().forEach((num, splitAssignmentState) -> {
            Assert.assertEquals("Each subtask should have 5 assigned splits", 5, splitAssignmentState.getAssignedSplits().size());
            Assert.assertTrue("Each subtask should have received NoMoreSplits", splitAssignmentState.hasReceivedNoMoreSplitsSignal());
            splitAssignmentState.getAssignedSplits().forEach(flinkSourceSplit -> {
                try {
                    Assert.assertEquals(1, flinkSourceSplit.getBeamSplitSource().getEstimatedSizeBytes(FlinkPipelineOptions.defaults()));
                } catch (Exception e) {
                    Assert.fail("Received exception" + e);
                }
            });
        });
    }

    @Test
    public void testAssignSplitsWithUnboundedSource() throws IOException {
        TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(5);
        assignSplits(testingSplitEnumeratorContext, new TestCountingSource(10), 10);
        testingSplitEnumeratorContext.getSplitAssignments().forEach((num, splitAssignmentState) -> {
            Assert.assertEquals("Each subtask should have 2 assigned splits", 2, splitAssignmentState.getAssignedSplits().size());
            Assert.assertTrue("Each subtask should have received NoMoreSplits", splitAssignmentState.hasReceivedNoMoreSplitsSignal());
        });
    }

    @Test
    public void testAddSplitsBack() throws IOException {
        TestingSplitEnumeratorContext testingSplitEnumeratorContext = new TestingSplitEnumeratorContext(2);
        FlinkSourceSplitEnumerator flinkSourceSplitEnumerator = new FlinkSourceSplitEnumerator(testingSplitEnumeratorContext, new TestBoundedCountingSource(10, 10), FlinkPipelineOptions.defaults(), 10);
        try {
            flinkSourceSplitEnumerator.start();
            testingSplitEnumeratorContext.registerReader(0, "0");
            flinkSourceSplitEnumerator.addReader(0);
            testingSplitEnumeratorContext.getExecutorService().triggerAll();
            List assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState) testingSplitEnumeratorContext.getSplitAssignments().get(0)).getAssignedSplits();
            Assert.assertEquals(5L, assignedSplits.size());
            flinkSourceSplitEnumerator.addSplitsBack(assignedSplits, 0);
            flinkSourceSplitEnumerator.addReader(0);
            Assert.assertEquals(10L, assignedSplits.size());
            $closeResource(null, flinkSourceSplitEnumerator);
        } catch (Throwable th) {
            $closeResource(null, flinkSourceSplitEnumerator);
            throw th;
        }
    }

    private void assignSplits(TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> testingSplitEnumeratorContext, Source<KV<Integer, Integer>> source, int i) throws IOException {
        FlinkSourceSplitEnumerator flinkSourceSplitEnumerator = new FlinkSourceSplitEnumerator(testingSplitEnumeratorContext, source, FlinkPipelineOptions.defaults(), i);
        Throwable th = null;
        try {
            try {
                flinkSourceSplitEnumerator.start();
                testingSplitEnumeratorContext.registerReader(0, "0");
                flinkSourceSplitEnumerator.addReader(0);
                testingSplitEnumeratorContext.getExecutorService().triggerAll();
                testingSplitEnumeratorContext.registerReader(1, "1");
                flinkSourceSplitEnumerator.addReader(1);
                $closeResource(null, flinkSourceSplitEnumerator);
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, flinkSourceSplitEnumerator);
            throw th2;
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
