package org.apache.beam.runners.samza.adapter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.adapter.BoundedSourceSystem;
import org.apache.beam.runners.samza.adapter.TestBoundedSource;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.samza.Partition;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.class */
public class BoundedSourceSystemTest {
    private static final long DEFAULT_TIMEOUT_MILLIS = 1000;
    private static final SystemStreamPartition DEFAULT_SSP = new SystemStreamPartition("default-system", "default-system", new Partition(0));
    private static final String NULL_STRING = null;

    @Test
    public void testConsumerStartStop() throws IOException, InterruptedException {
        BoundedSourceSystem.Consumer<String> createConsumer = createConsumer(TestBoundedSource.createBuilder().mo2build());
        createConsumer.register(DEFAULT_SSP, "0");
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), consumeUntilTimeoutOrEos(createConsumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
        createConsumer.stop();
    }

    @Test
    public void testConsumeOneMessage() throws IOException, InterruptedException {
        BoundedSourceSystem.Consumer<String> createConsumer = createConsumer(TestBoundedSource.createBuilder().addElements("test").mo2build());
        createConsumer.register(DEFAULT_SSP, "0");
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "0", "test", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), consumeUntilTimeoutOrEos(createConsumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
        createConsumer.stop();
    }

    @Test
    public void testAdvanceTimestamp() throws InterruptedException {
        Instant now = Instant.now();
        BoundedSourceSystem.Consumer<String> createConsumer = createConsumer(TestBoundedSource.createBuilder().addElements("before").setTimestamp(now).addElements("after").mo2build());
        createConsumer.register(DEFAULT_SSP, "0");
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "0", "before", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, "1", "after", now), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), consumeUntilTimeoutOrEos(createConsumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
        createConsumer.stop();
    }

    @Test
    public void testConsumeMultipleMessages() throws IOException, InterruptedException {
        BoundedSourceSystem.Consumer<String> createConsumer = createConsumer(TestBoundedSource.createBuilder().addElements("test", "a", "few", "messages").mo2build());
        createConsumer.register(DEFAULT_SSP, "0");
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "0", "test", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, "1", "a", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, "2", "few", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, "3", "messages", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), consumeUntilTimeoutOrEos(createConsumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
        createConsumer.stop();
    }

    @Test
    public void testReaderThrowsAtStart() throws Exception {
        IOException iOException = new IOException("Expected exception");
        BoundedSourceSystem.Consumer<String> createConsumer = createConsumer(TestBoundedSource.createBuilder().addException(iOException).mo2build());
        createConsumer.register(DEFAULT_SSP, "0");
        createConsumer.start();
        TestSourceHelpers.expectWrappedException(iOException, () -> {
            return consumeUntilTimeoutOrEos(createConsumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS);
        });
        createConsumer.stop();
    }

    @Test
    public void testReaderThrowsAtAdvance() throws Exception {
        IOException iOException = new IOException("Expected exception");
        BoundedSourceSystem.Consumer<String> createConsumer = createConsumer(TestBoundedSource.createBuilder().addElements("test", "a", "few", "good", "messages", "then", "...").addException(iOException).mo2build());
        createConsumer.register(DEFAULT_SSP, "0");
        createConsumer.start();
        TestSourceHelpers.expectWrappedException(iOException, () -> {
            return consumeUntilTimeoutOrEos(createConsumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS);
        });
        createConsumer.stop();
    }

    @Test
    public void testTimeout() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        BoundedSourceSystem.Consumer<String> createConsumer = createConsumer(TestBoundedSource.createBuilder().addElements("before").addLatch(countDownLatch).addElements("after").mo2build());
        createConsumer.register(DEFAULT_SSP, "0");
        createConsumer.start();
        Assert.assertEquals(Collections.singletonList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "0", "before", BoundedWindow.TIMESTAMP_MIN_VALUE)), consumeUntilTimeoutOrEos(createConsumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
        countDownLatch.countDown();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, "1", "after", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), consumeUntilTimeoutOrEos(createConsumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
        createConsumer.stop();
    }

    @Test
    public void testSplit() throws IOException, InterruptedException {
        TestBoundedSource.SplittableBuilder createSplits = TestBoundedSource.createSplits(3);
        createSplits.forSplit(0).addElements("split-0");
        createSplits.forSplit(1).addElements("split-1");
        createSplits.forSplit(2).addElements("split-2");
        BoundedSourceSystem.Consumer<String> createConsumer = createConsumer(createSplits.mo2build(), 3);
        createConsumer.register(ssp(0), NULL_STRING);
        createConsumer.register(ssp(1), NULL_STRING);
        createConsumer.register(ssp(2), NULL_STRING);
        createConsumer.start();
        HashSet hashSet = new HashSet();
        List<IncomingMessageEnvelope> consumeUntilTimeoutOrEos = consumeUntilTimeoutOrEos(createConsumer, ssp(0), DEFAULT_TIMEOUT_MILLIS);
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(ssp(0), consumeUntilTimeoutOrEos.get(0).getOffset(), "split-0", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(ssp(0), BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(ssp(0))), consumeUntilTimeoutOrEos);
        hashSet.add(consumeUntilTimeoutOrEos.get(0).getOffset());
        List<IncomingMessageEnvelope> consumeUntilTimeoutOrEos2 = consumeUntilTimeoutOrEos(createConsumer, ssp(1), DEFAULT_TIMEOUT_MILLIS);
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(ssp(1), consumeUntilTimeoutOrEos2.get(0).getOffset(), "split-1", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(ssp(1), BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(ssp(1))), consumeUntilTimeoutOrEos2);
        hashSet.add(consumeUntilTimeoutOrEos2.get(0).getOffset());
        List<IncomingMessageEnvelope> consumeUntilTimeoutOrEos3 = consumeUntilTimeoutOrEos(createConsumer, ssp(2), DEFAULT_TIMEOUT_MILLIS);
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(ssp(2), consumeUntilTimeoutOrEos3.get(0).getOffset(), "split-2", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(ssp(2), BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(ssp(2))), consumeUntilTimeoutOrEos3);
        hashSet.add(consumeUntilTimeoutOrEos3.get(0).getOffset());
        Assert.assertEquals(Sets.newHashSet(new String[]{"0", "1", "2"}), hashSet);
        createConsumer.stop();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<IncomingMessageEnvelope> consumeUntilTimeoutOrEos(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, long j) throws InterruptedException {
        Assert.assertTrue("Expected timeoutMillis (" + j + ") >= 0", j >= 0);
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = currentTimeMillis;
        while (true) {
            long j3 = j2;
            if (j + currentTimeMillis < j3) {
                break;
            }
            arrayList.addAll(pollOnce(systemConsumer, systemStreamPartition, (j3 - currentTimeMillis) - j));
            if (!arrayList.isEmpty() && ((IncomingMessageEnvelope) arrayList.get(arrayList.size() - 1)).isEndOfStream()) {
                break;
            }
            j2 = System.currentTimeMillis();
        }
        return arrayList;
    }

    private static List<IncomingMessageEnvelope> pollOnce(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, long j) throws InterruptedException {
        Set singleton = Collections.singleton(systemStreamPartition);
        Map poll = systemConsumer.poll(singleton, j);
        Assert.assertEquals(singleton, poll.keySet());
        Assert.assertNotNull(poll.get(systemStreamPartition));
        return (List) poll.get(systemStreamPartition);
    }

    private static <T> BoundedSourceSystem.Consumer<String> createConsumer(BoundedSource<String> boundedSource) {
        return createConsumer(boundedSource, 1);
    }

    private static BoundedSourceSystem.Consumer<String> createConsumer(BoundedSource<String> boundedSource, int i) {
        SamzaPipelineOptions as = PipelineOptionsFactory.as(SamzaPipelineOptions.class);
        as.setMaxSourceParallelism(i);
        return new BoundedSourceSystem.Consumer<>(boundedSource, as, new SamzaMetricsContainer(new MetricsRegistryMap()), "test-step");
    }

    private static SystemStreamPartition ssp(int i) {
        return new SystemStreamPartition("default-system", "default-system", new Partition(i));
    }
}
