package org.apache.beam.sdk.io;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.class */
public class BoundedReadFromUnboundedSourceTest implements Serializable {
    private static final int NUM_RECORDS = 100;
    private static List<Integer> finalizeTracker = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest$Checker.class */
    public static class Checker implements SerializableFunction<Iterable<KV<Integer, Integer>>, Void> {
        private final boolean dedup;
        private final boolean timeBound;

        Checker(boolean z, boolean z2) {
            this.dedup = z;
            this.timeBound = z2;
        }

        public Void apply(Iterable<KV<Integer, Integer>> iterable) {
            ArrayList arrayList = new ArrayList();
            for (KV<Integer, Integer> kv : iterable) {
                Assert.assertEquals(0L, ((Integer) kv.getKey()).intValue());
                arrayList.add(kv.getValue());
            }
            if (this.timeBound) {
                Assert.assertTrue(arrayList.size() >= 1);
            } else if (this.dedup) {
                Assert.assertTrue(arrayList.size() > 10 && arrayList.size() <= BoundedReadFromUnboundedSourceTest.NUM_RECORDS);
            } else {
                Assert.assertEquals(100L, arrayList.size());
            }
            Collections.sort(arrayList);
            for (int i = 0; i < arrayList.size(); i++) {
                Assert.assertEquals(i, ((Integer) arrayList.get(i)).intValue());
            }
            if (BoundedReadFromUnboundedSourceTest.finalizeTracker == null) {
                return null;
            }
            Assert.assertThat(BoundedReadFromUnboundedSourceTest.finalizeTracker, Matchers.containsInAnyOrder(new Integer[]{Integer.valueOf(arrayList.size() - 1)}));
            return null;
        }
    }

    @Test
    @Category({RunnableOnService.class})
    public void testNoDedup() throws Exception {
        test(false, false);
    }

    @Test
    @Category({RunnableOnService.class})
    public void testDedup() throws Exception {
        test(true, false);
    }

    @Test
    @Category({RunnableOnService.class})
    public void testTimeBound() throws Exception {
        test(false, true);
    }

    @Test
    public void testForwardsDisplayData() {
        TestCountingSource testCountingSource = new TestCountingSource(1234) { // from class: org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest.1
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("foo", "bar"));
            }
        };
        Assert.assertThat(DisplayData.from(Read.from(testCountingSource).withMaxNumRecords(5L)), DisplayDataMatchers.includesDisplayDataFrom(testCountingSource));
    }

    private void test(boolean z, boolean z2) throws Exception {
        TestPipeline create = TestPipeline.create();
        TestCountingSource withoutSplitting = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting();
        if (z) {
            withoutSplitting = withoutSplitting.withDedup();
        }
        PCollection apply = z2 ? (PCollection) create.apply(Read.from(withoutSplitting).withMaxReadTime(Duration.millis(200L))) : create.apply(Read.from(withoutSplitting).withMaxNumRecords(100L));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_RECORDS; i++) {
            arrayList.add(KV.of(0, Integer.valueOf(i)));
        }
        PAssert.that(apply).satisfies(new Checker(z, z2));
        create.run();
    }
}
