package org.apache.beam.sdk.io;

import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/CountingInputTest.class */
public class CountingInputTest {

    /* loaded from: input_file:org/apache/beam/sdk/io/CountingInputTest$ElementValueDiff.class */
    private static class ElementValueDiff extends DoFn<Long, Long> {
        private ElementValueDiff() {
        }

        public void processElement(DoFn<Long, Long>.ProcessContext processContext) throws Exception {
            processContext.output(Long.valueOf(((Long) processContext.element()).longValue() - processContext.timestamp().getMillis()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/CountingInputTest$ValueAsTimestampFn.class */
    private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant> {
        private ValueAsTimestampFn() {
        }

        public Instant apply(Long l) {
            return new Instant(l);
        }
    }

    public static void addCountingAsserts(PCollection<Long> pCollection, long j) {
        PAssert.thatSingleton(pCollection.apply("Count", Count.globally())).isEqualTo(Long.valueOf(j));
        PAssert.thatSingleton(pCollection.apply(RemoveDuplicates.create()).apply("UniqueCount", Count.globally())).isEqualTo(Long.valueOf(j));
        PAssert.thatSingleton(pCollection.apply("Min", Min.globally())).isEqualTo(0L);
        PAssert.thatSingleton(pCollection.apply("Max", Max.globally())).isEqualTo(Long.valueOf(j - 1));
    }

    @Test
    @Category({RunnableOnService.class})
    public void testBoundedInput() {
        TestPipeline create = TestPipeline.create();
        addCountingAsserts(create.apply(CountingInput.upTo(1000L)), 1000L);
        create.run();
    }

    @Test
    public void testBoundedDisplayData() {
        Assert.assertThat(DisplayData.from(CountingInput.upTo(1234L)), DisplayDataMatchers.hasDisplayItem("upTo", 1234L));
    }

    @Test
    @Category({RunnableOnService.class})
    public void testUnboundedInput() {
        TestPipeline create = TestPipeline.create();
        addCountingAsserts(create.apply(CountingInput.unbounded().withMaxNumRecords(1000L)), 1000L);
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedInputRate() {
        TestPipeline create = TestPipeline.create();
        Duration millis = Duration.millis(8L);
        addCountingAsserts(create.apply(CountingInput.unbounded().withRate(10L, millis).withMaxNumRecords(5000L)), 5000L);
        long millis2 = (millis.getMillis() * 5000) / 10;
        Instant now = Instant.now();
        create.run();
        Assert.assertThat(Boolean.valueOf(Instant.now().isAfter(now.plus(millis2))), Matchers.is(true));
    }

    @Test
    @Category({RunnableOnService.class})
    public void testUnboundedInputTimestamps() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(CountingInput.unbounded().withTimestampFn(new ValueAsTimestampFn()).withMaxNumRecords(1000L));
        addCountingAsserts(apply, 1000L);
        PAssert.thatSingleton(apply.apply("TimestampDiff", ParDo.of(new ElementValueDiff())).apply("RemoveDuplicateTimestamps", RemoveDuplicates.create())).isEqualTo(0L);
        create.run();
    }

    @Test
    public void testUnboundedDisplayData() {
        Duration standardHours = Duration.standardHours(5L);
        SerializableFunction<Long, Instant> serializableFunction = new SerializableFunction<Long, Instant>() { // from class: org.apache.beam.sdk.io.CountingInputTest.1
            public Instant apply(Long l) {
                return Instant.now();
            }
        };
        DisplayData from = DisplayData.from(CountingInput.unbounded().withMaxNumRecords(1234L).withMaxReadTime(standardHours).withTimestampFn(serializableFunction));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("maxRecords", 1234L));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("maxReadTime", standardHours));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("timestampFn", serializableFunction.getClass()));
    }
}
