package org.apache.beam.sdk.nexmark.sources;

import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.sources.Generator;
import org.apache.beam.sdk.nexmark.sources.UnboundedEventSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.class */
public class UnboundedEventSourceTest {

    /* loaded from: input_file:org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest$EventIdChecker.class */
    private static class EventIdChecker {
        private final Set<Long> seenPersonIds;
        private final Set<Long> seenAuctionIds;

        private EventIdChecker() {
            this.seenPersonIds = new HashSet();
            this.seenAuctionIds = new HashSet();
        }

        public void add(Event event) {
            if (event.newAuction != null) {
                Assert.assertTrue(this.seenAuctionIds.add(Long.valueOf(event.newAuction.id)));
            } else if (event.newPerson != null) {
                Assert.assertTrue(this.seenPersonIds.add(Long.valueOf(event.newPerson.id)));
            }
        }

        public void add(int i, UnboundedSource.UnboundedReader<Event> unboundedReader, Generator generator) throws IOException {
            for (int i2 = 0; i2 < i; i2++) {
                Assert.assertTrue(generator.hasNext());
                Event event = (Event) generator.next().getValue();
                Assert.assertTrue(unboundedReader.advance());
                Event event2 = (Event) unboundedReader.getCurrent();
                Assert.assertEquals(event.toString(), event2.toString());
                add(event2);
            }
        }
    }

    private GeneratorConfig makeConfig(long j) {
        return new GeneratorConfig(NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0L, j, 0L);
    }

    @Test
    public void resumeFromCheckpoint() throws IOException {
        Random random = new Random(297L);
        int i = 47293;
        GeneratorConfig makeConfig = makeConfig(47293);
        Generator generator = new Generator(makeConfig);
        EventIdChecker eventIdChecker = new EventIdChecker();
        PipelineOptions testingPipelineOptions = TestPipeline.testingPipelineOptions();
        UnboundedEventSource unboundedEventSource = new UnboundedEventSource(makeConfig, 1, 0L, false);
        UnboundedEventSource.EventReader createReader = unboundedEventSource.createReader(testingPipelineOptions, (Generator.Checkpoint) null);
        while (true) {
            UnboundedEventSource.EventReader eventReader = createReader;
            if (i <= 0) {
                Assert.assertFalse(eventReader.advance());
                return;
            }
            int min = Math.min(459 + random.nextInt(455), i);
            System.out.printf("reading %d...%n", Integer.valueOf(min));
            eventIdChecker.add(min, eventReader, generator);
            i -= min;
            System.out.printf("splitting with %d remaining...%n", Integer.valueOf(i));
            createReader = unboundedEventSource.createReader(testingPipelineOptions, eventReader.getCheckpointMark());
        }
    }
}
