package org.apache.beam.sdk.io.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.class */
public class ReadFromKafkaDoFnTest {
    private final TopicPartition topicPartition = new TopicPartition("topic", 0);

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private final SimpleMockKafkaConsumer consumer = new SimpleMockKafkaConsumer(OffsetResetStrategy.NONE, this.topicPartition);
    private final ReadFromKafkaDoFn<String, String> dofnInstance = ReadFromKafkaDoFn.create(makeReadSourceDescriptor(this.consumer), RECORDS);
    private final ExceptionMockKafkaConsumer exceptionConsumer = new ExceptionMockKafkaConsumer(OffsetResetStrategy.NONE, this.topicPartition);
    private final ReadFromKafkaDoFn<String, String> exceptionDofnInstance = ReadFromKafkaDoFn.create(makeReadSourceDescriptor(this.exceptionConsumer), RECORDS);
    private static final TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> RECORDS = new TupleTag<>();
    private static final TypeDescriptor<KafkaSourceDescriptor> KAFKA_SOURCE_DESCRIPTOR_TYPE_DESCRIPTOR = new TypeDescriptor<KafkaSourceDescriptor>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFnTest.4
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest$BoundednessVisitor.class */
    public static class BoundednessVisitor extends Pipeline.PipelineVisitor.Defaults {
        final List<PCollection<?>> unboundedPCollections = new ArrayList();

        BoundednessVisitor() {
        }

        public void visitValue(PValue pValue, TransformHierarchy.Node node) {
            if (pValue instanceof PCollection) {
                PCollection<?> pCollection = (PCollection) pValue;
                if (pCollection.isBounded() == PCollection.IsBounded.UNBOUNDED) {
                    this.unboundedPCollections.add(pCollection);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest$ExceptionMockKafkaConsumer.class */
    private static class ExceptionMockKafkaConsumer extends MockConsumer<byte[], byte[]> {
        private final TopicPartition topicPartition;

        public ExceptionMockKafkaConsumer(OffsetResetStrategy offsetResetStrategy, TopicPartition topicPartition) {
            super(offsetResetStrategy);
            this.topicPartition = topicPartition;
        }

        public synchronized long position(TopicPartition topicPartition) {
            throw new KafkaException("PositionException");
        }

        public synchronized void seek(TopicPartition topicPartition, long j) {
            throw new KafkaException("SeekException");
        }

        public synchronized Map<String, List<PartitionInfo>> listTopics() {
            return ImmutableMap.of(this.topicPartition.topic(), ImmutableList.of(new PartitionInfo(this.topicPartition.topic(), this.topicPartition.partition(), (Node) null, (Node[]) null, (Node[]) null)));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest$FailingDeserializer.class */
    public static class FailingDeserializer implements Deserializer<String> {
        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m14deserialize(String str, byte[] bArr) {
            throw new SerializationException("Intentional serialization exception");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest$MockMultiOutputReceiver.class */
    private static class MockMultiOutputReceiver implements DoFn.MultiOutputReceiver {
        MockOutputReceiver<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> mockOutputReceiver;
        MockOutputReceiver<BadRecord> badOutputReceiver;

        private MockMultiOutputReceiver() {
            this.mockOutputReceiver = new MockOutputReceiver<>();
            this.badOutputReceiver = new MockOutputReceiver<>();
        }

        public <T> DoFn.OutputReceiver<T> get(TupleTag<T> tupleTag) {
            if (ReadFromKafkaDoFnTest.RECORDS.equals(tupleTag)) {
                return this.mockOutputReceiver;
            }
            if (BadRecordRouter.BAD_RECORD_TAG.equals(tupleTag)) {
                return this.badOutputReceiver;
            }
            throw new RuntimeException("Invalid Tag");
        }

        public List<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> getGoodRecords() {
            return this.mockOutputReceiver.getOutputs();
        }

        public List<BadRecord> getBadRecords() {
            return this.badOutputReceiver.getOutputs();
        }

        public <T> DoFn.OutputReceiver<Row> getRowReceiver(TupleTag<T> tupleTag) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest$MockOutputReceiver.class */
    public static class MockOutputReceiver<T> implements DoFn.OutputReceiver<T> {
        private final List<T> records;

        private MockOutputReceiver() {
            this.records = new ArrayList();
        }

        public void output(T t) {
            this.records.add(t);
        }

        public void outputWithTimestamp(T t, Instant instant) {
            this.records.add(t);
        }

        public List<T> getOutputs() {
            return this.records;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest$SimpleMockKafkaConsumer.class */
    public static class SimpleMockKafkaConsumer extends MockConsumer<byte[], byte[]> {
        private final TopicPartition topicPartition;
        private boolean isRemoved;
        private long currentPos;
        private long startOffset;
        private KV<Long, Instant> startOffsetForTime;
        private KV<Long, Instant> stopOffsetForTime;
        private long numOfRecordsPerPoll;

        public SimpleMockKafkaConsumer(OffsetResetStrategy offsetResetStrategy, TopicPartition topicPartition) {
            super(offsetResetStrategy);
            this.isRemoved = false;
            this.currentPos = 0L;
            this.startOffset = 0L;
            this.startOffsetForTime = KV.of(0L, Instant.now());
            this.stopOffsetForTime = KV.of(Long.MAX_VALUE, (Object) null);
            this.topicPartition = topicPartition;
        }

        public void reset() {
            this.isRemoved = false;
            this.currentPos = 0L;
            this.startOffset = 0L;
            this.startOffsetForTime = KV.of(0L, Instant.now());
            this.stopOffsetForTime = KV.of(Long.MAX_VALUE, (Object) null);
            this.numOfRecordsPerPoll = 0L;
        }

        public void setRemoved() {
            this.isRemoved = true;
        }

        public void setNumOfRecordsPerPoll(long j) {
            this.numOfRecordsPerPoll = j;
        }

        public void setCurrentPos(long j) {
            this.currentPos = j;
        }

        public void setStartOffsetForTime(long j, Instant instant) {
            this.startOffsetForTime = KV.of(Long.valueOf(j), instant);
        }

        public void setStopOffsetForTime(long j, Instant instant) {
            this.stopOffsetForTime = KV.of(Long.valueOf(j), instant);
        }

        public synchronized Map<String, List<PartitionInfo>> listTopics() {
            return this.isRemoved ? ImmutableMap.of() : ImmutableMap.of(this.topicPartition.topic(), ImmutableList.of(new PartitionInfo(this.topicPartition.topic(), this.topicPartition.partition(), (Node) null, (Node[]) null, (Node[]) null)));
        }

        public synchronized void assign(Collection<TopicPartition> collection) {
            Assert.assertTrue(((TopicPartition) Iterables.getOnlyElement(collection)).equals(this.topicPartition));
        }

        public synchronized void seek(TopicPartition topicPartition, long j) {
            Assert.assertTrue(topicPartition.equals(this.topicPartition));
            this.startOffset = j;
        }

        public synchronized ConsumerRecords<byte[], byte[]> poll(Duration duration) {
            if (this.topicPartition == null) {
                return ConsumerRecords.empty();
            }
            ArrayList arrayList = new ArrayList();
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 > this.numOfRecordsPerPoll) {
                    break;
                }
                arrayList.add(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), this.startOffset + j2, "key".getBytes(Charsets.UTF_8), "value".getBytes(Charsets.UTF_8)));
                j = j2 + 1;
            }
            return arrayList.isEmpty() ? ConsumerRecords.empty() : new ConsumerRecords<>(ImmutableMap.of(this.topicPartition, arrayList));
        }

        public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
            Assert.assertTrue(((TopicPartition) Iterables.getOnlyElement((Iterable) map.keySet().stream().collect(Collectors.toList()))).equals(this.topicPartition));
            Long l = (Long) Iterables.getOnlyElement(map.values());
            Long l2 = 0L;
            if (l.longValue() == ((Instant) this.startOffsetForTime.getValue()).getMillis()) {
                l2 = (Long) this.startOffsetForTime.getKey();
            } else if (l.longValue() == ((Instant) this.stopOffsetForTime.getValue()).getMillis()) {
                l2 = (Long) this.stopOffsetForTime.getKey();
            }
            return ImmutableMap.of(this.topicPartition, new OffsetAndTimestamp(l2.longValue(), l.longValue()));
        }

        public synchronized long position(TopicPartition topicPartition) {
            Assert.assertTrue(topicPartition.equals(this.topicPartition));
            return this.currentPos;
        }
    }

    private KafkaIO.ReadSourceDescriptors<String, String> makeReadSourceDescriptor(final Consumer<byte[], byte[]> consumer) {
        return KafkaIO.ReadSourceDescriptors.read().withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class).withConsumerFactoryFn(new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFnTest.1
            public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
                return consumer;
            }
        }).withBootstrapServers("bootstrap_server");
    }

    private KafkaIO.ReadSourceDescriptors<String, String> makeFailingReadSourceDescriptor(final Consumer<byte[], byte[]> consumer) {
        return KafkaIO.ReadSourceDescriptors.read().withKeyDeserializer(FailingDeserializer.class).withValueDeserializer(FailingDeserializer.class).withConsumerFactoryFn(new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFnTest.2
            public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
                return consumer;
            }
        }).withBootstrapServers("bootstrap_server");
    }

    private List<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> createExpectedRecords(KafkaSourceDescriptor kafkaSourceDescriptor, long j, int i, String str, String str2) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(KV.of(kafkaSourceDescriptor, new KafkaRecord(this.topicPartition.topic(), this.topicPartition.partition(), j + i2, -1L, KafkaTimestampType.NO_TIMESTAMP_TYPE, new RecordHeaders(), KV.of(str, str2))));
        }
        return arrayList;
    }

    @Before
    public void setUp() throws Exception {
        this.dofnInstance.setup();
        this.exceptionDofnInstance.setup();
        this.consumer.reset();
    }

    @Test
    public void testInitialRestrictionWhenHasStartOffset() throws Exception {
        this.consumer.setStartOffsetForTime(15L, Instant.now());
        this.consumer.setCurrentPos(5L);
        Assert.assertEquals(new OffsetRange(10L, Long.MAX_VALUE), this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of(this.topicPartition, 10L, (Instant) null, (Long) null, (Instant) null, ImmutableList.of())));
    }

    @Test
    public void testInitialRestrictionWhenHasStopOffset() throws Exception {
        this.consumer.setStartOffsetForTime(15L, Instant.now());
        this.consumer.setStopOffsetForTime(18L, Instant.now());
        this.consumer.setCurrentPos(5L);
        Assert.assertEquals(new OffsetRange(10L, 20L), this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of(this.topicPartition, 10L, (Instant) null, 20L, (Instant) null, ImmutableList.of())));
    }

    @Test
    public void testInitialRestrictionWhenHasStartTime() throws Exception {
        Instant now = Instant.now();
        this.consumer.setStartOffsetForTime(10L, now);
        this.consumer.setCurrentPos(5L);
        Assert.assertEquals(new OffsetRange(10L, Long.MAX_VALUE), this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, now, (Long) null, (Instant) null, ImmutableList.of())));
    }

    @Test
    public void testInitialRestrictionWhenHasStopTime() throws Exception {
        Instant now = Instant.now();
        Instant plus = now.plus(org.joda.time.Duration.millis(2000L));
        this.consumer.setStartOffsetForTime(10L, now);
        this.consumer.setStopOffsetForTime(100L, plus);
        this.consumer.setCurrentPos(5L);
        Assert.assertEquals(new OffsetRange(10L, 100L), this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, now, (Long) null, plus, ImmutableList.of())));
    }

    @Test
    public void testInitialRestrictionWithConsumerPosition() throws Exception {
        this.consumer.setCurrentPos(5L);
        Assert.assertEquals(new OffsetRange(5L, Long.MAX_VALUE), this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, ImmutableList.of())));
    }

    @Test
    public void testInitialRestrictionWithException() throws Exception {
        this.thrown.expect(KafkaException.class);
        this.thrown.expectMessage("PositionException");
        this.exceptionDofnInstance.initialRestriction(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, ImmutableList.of()));
    }

    @Test
    public void testProcessElement() throws Exception {
        MockMultiOutputReceiver mockMultiOutputReceiver = new MockMultiOutputReceiver();
        this.consumer.setNumOfRecordsPerPoll(3L);
        OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(new OffsetRange(5L, 5 + 3));
        KafkaSourceDescriptor of = KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, (List) null);
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.dofnInstance.processElement(of, offsetRangeTracker, (WatermarkEstimator) null, mockMultiOutputReceiver));
        Assert.assertEquals(createExpectedRecords(of, 5L, 3, "key", "value"), mockMultiOutputReceiver.getGoodRecords());
    }

    @Test
    public void testRawSizeMetric() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        MockMultiOutputReceiver mockMultiOutputReceiver = new MockMultiOutputReceiver();
        this.consumer.setNumOfRecordsPerPoll(1000L);
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.dofnInstance.processElement(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, (List) null), new OffsetRangeTracker(new OffsetRange(0L, 1000L)), (WatermarkEstimator) null, mockMultiOutputReceiver));
        Assert.assertEquals(metricsContainerImpl.getDistribution(MetricName.named("KafkaIOReader", "rawSize/" + this.topicPartition)).getCumulative(), DistributionData.create(8000L, 1000L, 8L, 8L));
    }

    @Test
    public void testProcessElementWithEmptyPoll() throws Exception {
        MockMultiOutputReceiver mockMultiOutputReceiver = new MockMultiOutputReceiver();
        this.consumer.setNumOfRecordsPerPoll(-1L);
        Assert.assertEquals(DoFn.ProcessContinuation.resume(), this.dofnInstance.processElement(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, (List) null), new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)), (WatermarkEstimator) null, mockMultiOutputReceiver));
        Assert.assertTrue(mockMultiOutputReceiver.getGoodRecords().isEmpty());
    }

    @Test
    public void testProcessElementWhenTopicPartitionIsRemoved() throws Exception {
        MockMultiOutputReceiver mockMultiOutputReceiver = new MockMultiOutputReceiver();
        this.consumer.setRemoved();
        this.consumer.setNumOfRecordsPerPoll(10L);
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.dofnInstance.processElement(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, (List) null), new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)), (WatermarkEstimator) null, mockMultiOutputReceiver));
    }

    @Test
    public void testProcessElementWhenTopicPartitionIsStopped() throws Exception {
        MockMultiOutputReceiver mockMultiOutputReceiver = new MockMultiOutputReceiver();
        ReadFromKafkaDoFn create = ReadFromKafkaDoFn.create(makeReadSourceDescriptor(this.consumer).toBuilder().setCheckStopReadingFn(new SerializableFunction<TopicPartition, Boolean>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFnTest.3
            public Boolean apply(TopicPartition topicPartition) {
                Assert.assertTrue(topicPartition.equals(ReadFromKafkaDoFnTest.this.topicPartition));
                return true;
            }
        }).build(), RECORDS);
        create.setup();
        this.consumer.setNumOfRecordsPerPoll(10L);
        OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
        DoFn.ProcessContinuation processElement = create.processElement(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, (List) null), offsetRangeTracker, (WatermarkEstimator) null, mockMultiOutputReceiver);
        offsetRangeTracker.checkDone();
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), processElement);
    }

    @Test
    public void testProcessElementWithException() throws Exception {
        this.thrown.expect(KafkaException.class);
        this.thrown.expectMessage("SeekException");
        MockMultiOutputReceiver mockMultiOutputReceiver = new MockMultiOutputReceiver();
        this.exceptionDofnInstance.processElement(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, (List) null), new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)), (WatermarkEstimator) null, mockMultiOutputReceiver);
    }

    @Test
    public void testProcessElementWithDeserializationExceptionDefaultRecordHandler() throws Exception {
        this.thrown.expect(SerializationException.class);
        this.thrown.expectMessage("Intentional serialization exception");
        MockMultiOutputReceiver mockMultiOutputReceiver = new MockMultiOutputReceiver();
        OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
        this.consumer.setNumOfRecordsPerPoll(1L);
        ReadFromKafkaDoFn create = ReadFromKafkaDoFn.create(makeFailingReadSourceDescriptor(this.consumer), RECORDS);
        create.setup();
        create.processElement(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, (List) null), offsetRangeTracker, (WatermarkEstimator) null, mockMultiOutputReceiver);
        Assert.assertEquals("OutputRecordSize", 0L, mockMultiOutputReceiver.getGoodRecords().size());
        Assert.assertEquals("OutputErrorSize", 0L, mockMultiOutputReceiver.getBadRecords().size());
    }

    @Test
    public void testProcessElementWithDeserializationExceptionRecordingRecordHandler() throws Exception {
        MockMultiOutputReceiver mockMultiOutputReceiver = new MockMultiOutputReceiver();
        OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(new OffsetRange(0L, 1L));
        this.consumer.setNumOfRecordsPerPoll(1L);
        ReadFromKafkaDoFn create = ReadFromKafkaDoFn.create(makeFailingReadSourceDescriptor(this.consumer).withBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler()), RECORDS);
        create.setup();
        create.processElement(KafkaSourceDescriptor.of(this.topicPartition, (Long) null, (Instant) null, (Long) null, (Instant) null, (List) null), offsetRangeTracker, (WatermarkEstimator) null, mockMultiOutputReceiver);
        Assert.assertEquals("OutputRecordSize", 0L, mockMultiOutputReceiver.getGoodRecords().size());
        Assert.assertEquals("OutputErrorSize", 1L, mockMultiOutputReceiver.getBadRecords().size());
    }

    @Test
    public void testBounded() {
        Assert.assertEquals(0L, testBoundedness(readSourceDescriptors -> {
            return readSourceDescriptors.withBounded();
        }).unboundedPCollections.size());
    }

    @Test
    public void testUnbounded() {
        Assert.assertNotEquals(0L, testBoundedness(readSourceDescriptors -> {
            return readSourceDescriptors;
        }).unboundedPCollections.size());
    }

    private BoundednessVisitor testBoundedness(Function<KafkaIO.ReadSourceDescriptors<String, String>, KafkaIO.ReadSourceDescriptors<String, String>> function) {
        TestPipeline create = TestPipeline.create();
        create.apply(Create.empty(KAFKA_SOURCE_DESCRIPTOR_TYPE_DESCRIPTOR)).apply(ParDo.of(ReadFromKafkaDoFn.create(function.apply(makeReadSourceDescriptor(this.consumer)), RECORDS))).setCoder(KvCoder.of(SerializableCoder.of(KafkaSourceDescriptor.class), KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
        BoundednessVisitor boundednessVisitor = new BoundednessVisitor();
        create.traverseTopologically(boundednessVisitor);
        return boundednessVisitor;
    }
}
