package org.apache.beam.runners.samza.adapter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.adapter.TestUnboundedSource;
import org.apache.beam.runners.samza.adapter.UnboundedSourceSystem;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.samza.Partition;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.MessageType;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.class */
public class UnboundedSourceSystemTest {
    private static final long DEFAULT_TIMEOUT_MILLIS = 1000;
    private static final long DEFAULT_WATERMARK_TIMEOUT_MILLIS = 1000;
    private static final String NULL_STRING = null;
    private static final SystemStreamPartition DEFAULT_SSP = new SystemStreamPartition("default-system", "default-system", new Partition(0));
    private static final Coder<TestCheckpointMark> CHECKPOINT_MARK_CODER = TestUnboundedSource.createBuilder().mo2build().getCheckpointMarkCoder();

    @Test
    public void testConsumerStartStop() throws IOException, InterruptedException {
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().mo2build());
        createConsumer.register(DEFAULT_SSP, offset(0));
        createConsumer.start();
        Assert.assertEquals(Collections.EMPTY_LIST, consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        createConsumer.stop();
    }

    @Test
    public void testConsumeOneMessage() throws IOException, InterruptedException {
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().addElements("test").mo2build());
        createConsumer.register(DEFAULT_SSP, NULL_STRING);
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(0), "test", BoundedWindow.TIMESTAMP_MIN_VALUE)), consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        createConsumer.stop();
    }

    @Test
    public void testMaxWatermarkTriggersEndOfStreamMessage() throws IOException, InterruptedException {
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().addElements("test").advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE).mo2build());
        createConsumer.register(DEFAULT_SSP, NULL_STRING);
        createConsumer.start();
        List<IncomingMessageEnvelope> consumeUntilTimeoutOrWatermark = consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L);
        consumeUntilTimeoutOrWatermark.addAll(consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(0), "test", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE), TestSourceHelpers.createEndOfStreamMessage(DEFAULT_SSP)), consumeUntilTimeoutOrWatermark);
        createConsumer.stop();
    }

    @Test
    public void testAdvanceTimestamp() throws IOException, InterruptedException {
        Instant now = Instant.now();
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().addElements("before").setTimestamp(now).addElements("after").mo2build());
        createConsumer.register(DEFAULT_SSP, NULL_STRING);
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(0), "before", BoundedWindow.TIMESTAMP_MIN_VALUE), TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(1), "after", now)), consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        createConsumer.stop();
    }

    @Test
    public void testConsumeMultipleMessages() throws IOException, InterruptedException {
        Instant now = Instant.now();
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().setTimestamp(now).addElements("test", "a", "few", "messages").mo2build());
        createConsumer.register(DEFAULT_SSP, NULL_STRING);
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(0), "test", now), TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(1), "a", now), TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(2), "few", now), TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(3), "messages", now)), consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        createConsumer.stop();
    }

    @Test
    public void testAdvanceWatermark() throws IOException, InterruptedException {
        Instant now = Instant.now();
        Instant plus = now.plus(Duration.millis(1L));
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().setTimestamp(now).addElements("first").setTimestamp(plus).addElements("second").advanceWatermarkTo(now).mo2build());
        createConsumer.register(DEFAULT_SSP, NULL_STRING);
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(0), "first", now), TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(1), "second", plus), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, now)), consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        createConsumer.stop();
    }

    @Test
    @Ignore("https://github.com/apache/beam/issues/20376")
    public void testMultipleAdvanceWatermark() throws IOException, InterruptedException {
        Instant now = Instant.now();
        Instant plus = now.plus(Duration.millis(1L));
        Instant plus2 = now.plus(Duration.millis(2L));
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().setTimestamp(now).addElements("first").advanceWatermarkTo(now).noElements().setTimestamp(plus).addElements("second").setTimestamp(plus2).addElements("third").advanceWatermarkTo(plus).mo2build());
        createConsumer.register(DEFAULT_SSP, NULL_STRING);
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(0), "first", now), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, now)), consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(1), "second", plus), TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(2), "third", plus2), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, plus)), consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        createConsumer.stop();
    }

    @Test
    public void testReaderThrowsAtStart() throws Exception {
        IOException iOException = new IOException("Expected exception");
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().addException(iOException).mo2build());
        createConsumer.register(DEFAULT_SSP, NULL_STRING);
        createConsumer.start();
        TestSourceHelpers.expectWrappedException(iOException, () -> {
            return consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L);
        });
        createConsumer.stop();
    }

    @Test
    public void testReaderThrowsAtAdvance() throws Exception {
        IOException iOException = new IOException("Expected exception");
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().addElements("test", "a", "few", "good", "messages", "then", "...").addException(iOException).mo2build());
        createConsumer.register(DEFAULT_SSP, offset(0));
        createConsumer.start();
        TestSourceHelpers.expectWrappedException(iOException, () -> {
            return consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L);
        });
        createConsumer.stop();
    }

    @Test
    public void testTimeout() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Instant now = Instant.now();
        Instant plus = now.plus(Duration.millis(1L));
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(TestUnboundedSource.createBuilder().setTimestamp(now).addElements("before").addLatch(countDownLatch).setTimestamp(plus).addElements("after").advanceWatermarkTo(plus).mo2build());
        createConsumer.register(DEFAULT_SSP, NULL_STRING);
        createConsumer.start();
        Assert.assertEquals(Collections.singletonList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(0), "before", now)), consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        countDownLatch.countDown();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(DEFAULT_SSP, offset(1), "after", plus), TestSourceHelpers.createWatermarkMessage(DEFAULT_SSP, plus)), consumeUntilTimeoutOrWatermark(createConsumer, DEFAULT_SSP, 1000L));
        createConsumer.stop();
    }

    @Test
    public void testRestartFromCheckpoint() throws IOException, InterruptedException {
        TestUnboundedSource.SplittableBuilder createSplits = TestUnboundedSource.createSplits(3);
        createSplits.forSplit(0).addElements("split-0");
        createSplits.forSplit(1).addElements("split-1");
        createSplits.forSplit(2).addElements("split-2");
        UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer = createConsumer(createSplits.mo2build(), 3);
        createConsumer.register(ssp(0), offset(10));
        createConsumer.register(ssp(1), offset(5));
        createConsumer.register(ssp(2), offset(8));
        createConsumer.start();
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(ssp(0), offset(11), "split-0", BoundedWindow.TIMESTAMP_MIN_VALUE)), consumeUntilTimeoutOrWatermark(createConsumer, ssp(0), 1000L));
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(ssp(1), offset(6), "split-1", BoundedWindow.TIMESTAMP_MIN_VALUE)), consumeUntilTimeoutOrWatermark(createConsumer, ssp(1), 1000L));
        Assert.assertEquals(Arrays.asList(TestSourceHelpers.createElementMessage(ssp(2), offset(9), "split-2", BoundedWindow.TIMESTAMP_MIN_VALUE)), consumeUntilTimeoutOrWatermark(createConsumer, ssp(2), 1000L));
        createConsumer.stop();
    }

    private static UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer(TestUnboundedSource<String> testUnboundedSource) {
        return createConsumer(testUnboundedSource, 1);
    }

    private static UnboundedSourceSystem.Consumer<String, TestCheckpointMark> createConsumer(TestUnboundedSource<String> testUnboundedSource, int i) {
        SamzaPipelineOptions as = PipelineOptionsFactory.as(SamzaPipelineOptions.class);
        as.setWatermarkInterval(0L);
        as.setMaxSourceParallelism(i);
        return new UnboundedSourceSystem.Consumer<>(testUnboundedSource, as, new SamzaMetricsContainer(new MetricsRegistryMap()), "test-step");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<IncomingMessageEnvelope> consumeUntilTimeoutOrWatermark(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, long j) throws InterruptedException {
        Assert.assertTrue("Expected timeoutMillis (" + j + ") >= 0", j >= 0);
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = currentTimeMillis;
        while (true) {
            long j3 = j2;
            if (j + currentTimeMillis < j3) {
                break;
            }
            arrayList.addAll(pollOnce(systemConsumer, systemStreamPartition, (j3 - currentTimeMillis) - j));
            if (!arrayList.isEmpty() && MessageType.of(((IncomingMessageEnvelope) arrayList.get(arrayList.size() - 1)).getMessage()) == MessageType.WATERMARK) {
                break;
            }
            j2 = System.currentTimeMillis();
        }
        return arrayList;
    }

    private static List<IncomingMessageEnvelope> pollOnce(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, long j) throws InterruptedException {
        Set singleton = Collections.singleton(systemStreamPartition);
        Map poll = systemConsumer.poll(singleton, j);
        Assert.assertEquals(singleton, poll.keySet());
        Assert.assertNotNull(poll.get(systemStreamPartition));
        return (List) poll.get(systemStreamPartition);
    }

    private static String offset(int i) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        CHECKPOINT_MARK_CODER.encode(TestCheckpointMark.of(i), byteArrayOutputStream);
        return Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray());
    }

    private static SystemStreamPartition ssp(int i) {
        return new SystemStreamPartition("default-system", "default-system", new Partition(i));
    }
}
