package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricResultsMatchers;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableWithSize;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest.class */
public class KafkaIOTest {

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Rule
    public ExpectedException noMessagesException = ExpectedException.none();

    @Rule
    public ExpectedException cannotInferException = ExpectedException.none();
    private static final MockProducer<Integer, Long> MOCK_PRODUCER = new MockProducer<Integer, Long>(false, new IntegerSerializer(), new LongSerializer()) { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.4
        public void flush() {
            while (completeNext()) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                }
            }
        }
    };
    private static final Object MOCK_PRODUCER_LOCK = new Object();

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$AssertMultipleOf.class */
    private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
        private final int num;

        public AssertMultipleOf(int i) {
            this.num = i;
        }

        public Void apply(Iterable<Long> iterable) {
            Iterator<Long> it = iterable.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(0L, it.next().longValue() % this.num);
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ConsumerFactoryFn.class */
    public static class ConsumerFactoryFn implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
        private final List<String> topics;
        private final int partitionsPerTopic;
        private final int numElements;
        private final OffsetResetStrategy offsetResetStrategy;

        public ConsumerFactoryFn(List<String> list, int i, int i2, OffsetResetStrategy offsetResetStrategy) {
            this.topics = list;
            this.partitionsPerTopic = i;
            this.numElements = i2;
            this.offsetResetStrategy = offsetResetStrategy;
        }

        public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
            return KafkaIOTest.mkMockConsumer(this.topics, this.partitionsPerTopic, this.numElements, this.offsetResetStrategy);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$DeserializerWithInterfaces.class */
    private static class DeserializerWithInterfaces implements DummyInterface<String>, DummyNonparametricInterface, Deserializer<Long> {
        private DeserializerWithInterfaces() {
        }

        public void configure(Map<String, ?> map, boolean z) {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Long m1deserialize(String str, byte[] bArr) {
            return 0L;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$DummyInterface.class */
    private interface DummyInterface<T> {
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$DummyNonparametricInterface.class */
    private interface DummyNonparametricInterface {
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ElementValueDiff.class */
    private static class ElementValueDiff extends DoFn<Long, Long> {
        private ElementValueDiff() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, Long>.ProcessContext processContext) throws Exception {
            processContext.output(Long.valueOf(((Long) processContext.element()).longValue() - processContext.timestamp().getMillis()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$InjectedErrorException.class */
    private static class InjectedErrorException extends RuntimeException {
        public InjectedErrorException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$NonInferableObject.class */
    public static class NonInferableObject {
        private NonInferableObject() {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$NonInferableObjectDeserializer.class */
    private static class NonInferableObjectDeserializer implements Deserializer<NonInferableObject> {
        private NonInferableObjectDeserializer() {
        }

        public void configure(Map<String, ?> map, boolean z) {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public NonInferableObject m2deserialize(String str, byte[] bArr) {
            return new NonInferableObject();
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ProducerFactoryFn.class */
    private static class ProducerFactoryFn implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> {
        private ProducerFactoryFn() {
        }

        public Producer<Integer, Long> apply(Map<String, Object> map) {
            if (map.get("key.serializer") != null) {
                ((Serializer) Utils.newInstance(((Class) map.get("key.serializer")).asSubclass(Serializer.class))).configure(map, true);
            }
            ((Serializer) Utils.newInstance(((Class) map.get("value.serializer")).asSubclass(Serializer.class))).configure(map, false);
            return KafkaIOTest.MOCK_PRODUCER;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ProducerSendCompletionThread.class */
    private static class ProducerSendCompletionThread {
        private final int maxErrors;
        private final int errorFrequency;
        private final AtomicBoolean done;
        private final ExecutorService injectorThread;
        private int numCompletions;

        ProducerSendCompletionThread() {
            this(0, 0);
        }

        ProducerSendCompletionThread(int i, int i2) {
            this.done = new AtomicBoolean(false);
            this.numCompletions = 0;
            this.maxErrors = i;
            this.errorFrequency = i2;
            this.injectorThread = Executors.newSingleThreadExecutor();
        }

        ProducerSendCompletionThread start() {
            this.injectorThread.submit(new Runnable() { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.ProducerSendCompletionThread.1
                @Override // java.lang.Runnable
                public void run() {
                    boolean completeNext;
                    int i = 0;
                    while (!ProducerSendCompletionThread.this.done.get()) {
                        if (i >= ProducerSendCompletionThread.this.maxErrors || (ProducerSendCompletionThread.this.numCompletions + 1) % ProducerSendCompletionThread.this.errorFrequency != 0) {
                            completeNext = KafkaIOTest.MOCK_PRODUCER.completeNext();
                        } else {
                            completeNext = KafkaIOTest.MOCK_PRODUCER.errorNext(new InjectedErrorException("Injected Error #" + (i + 1)));
                            if (completeNext) {
                                i++;
                            }
                        }
                        if (completeNext) {
                            ProducerSendCompletionThread.access$908(ProducerSendCompletionThread.this);
                        } else {
                            try {
                                Thread.sleep(1L);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                }
            });
            return this;
        }

        void shutdown() {
            this.done.set(true);
            this.injectorThread.shutdown();
            try {
                Assert.assertTrue(this.injectorThread.awaitTermination(10L, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        static /* synthetic */ int access$908(ProducerSendCompletionThread producerSendCompletionThread) {
            int i = producerSendCompletionThread.numCompletions;
            producerSendCompletionThread.numCompletions = i + 1;
            return i;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$RemoveKafkaMetadata.class */
    private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
        private RemoveKafkaMetadata() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KafkaRecord<K, V>, KV<K, V>>.ProcessContext processContext) throws Exception {
            processContext.output(((KafkaRecord) processContext.element()).getKV());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ValueAsTimestampFn.class */
    private static class ValueAsTimestampFn implements SerializableFunction<KV<Integer, Long>, Instant> {
        private ValueAsTimestampFn() {
        }

        public Instant apply(KV<Integer, Long> kv) {
            return new Instant(kv.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MockConsumer<byte[], byte[]> mkMockConsumer(List<String> list, int i, int i2, OffsetResetStrategy offsetResetStrategy) {
        final ArrayList arrayList = new ArrayList();
        final HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (String str : list) {
            ArrayList arrayList2 = new ArrayList(i);
            for (int i3 = 0; i3 < i; i3++) {
                TopicPartition topicPartition = new TopicPartition(str, i3);
                arrayList.add(topicPartition);
                arrayList2.add(new PartitionInfo(str, i3, (Node) null, (Node[]) null, (Node[]) null));
                hashMap.put(topicPartition, new ArrayList());
            }
            hashMap2.put(str, arrayList2);
        }
        int size = arrayList.size();
        final long[] jArr = new long[size];
        for (int i4 = 0; i4 < i2; i4++) {
            int i5 = i4 % size;
            TopicPartition topicPartition2 = (TopicPartition) arrayList.get(i5);
            List list2 = (List) hashMap.get(topicPartition2);
            String str2 = topicPartition2.topic();
            int partition = topicPartition2.partition();
            long j = jArr[i5];
            jArr[i5] = j + 1;
            list2.add(new ConsumerRecord(str2, partition, j, ByteBuffer.wrap(new byte[4]).putInt(i4).array(), ByteBuffer.wrap(new byte[8]).putLong(i4).array()));
        }
        final AtomicReference atomicReference = new AtomicReference(Collections.emptyList());
        final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(offsetResetStrategy) { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.1
            public void assign(List<TopicPartition> list3) {
                super.assign(list3);
                atomicReference.set(ImmutableList.copyOf(list3));
                for (TopicPartition topicPartition3 : list3) {
                    updateBeginningOffsets(ImmutableMap.of(topicPartition3, 0L));
                    updateEndOffsets(ImmutableMap.of(topicPartition3, Long.valueOf(((List) hashMap.get(topicPartition3)).size())));
                }
            }

            public Map offsetsForTimes(Map<TopicPartition, Long> map) {
                HashMap hashMap3 = new HashMap();
                try {
                    Constructor<?> declaredConstructor = Class.forName("org.apache.kafka.clients.consumer.OffsetAndTimestamp").getDeclaredConstructor(Long.TYPE, Long.TYPE);
                    for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                        long j2 = jArr[arrayList.indexOf(entry.getKey())];
                        Long value = entry.getValue();
                        if (value.longValue() >= j2) {
                            value = null;
                        }
                        hashMap3.put(entry.getKey(), declaredConstructor.newInstance(entry.getValue(), value));
                    }
                    return hashMap3;
                } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        for (String str3 : list) {
            mockConsumer.updatePartitions(str3, (List) hashMap2.get(str3));
        }
        mockConsumer.schedulePollTask(new Runnable() { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.2
            @Override // java.lang.Runnable
            public void run() {
                for (TopicPartition topicPartition3 : (List) atomicReference.get()) {
                    long position = mockConsumer.position(topicPartition3);
                    for (ConsumerRecord consumerRecord : (List) hashMap.get(topicPartition3)) {
                        if (consumerRecord.offset() >= position) {
                            mockConsumer.addRecord(consumerRecord);
                        }
                    }
                }
                mockConsumer.schedulePollTask(this);
            }
        });
        return mockConsumer;
    }

    private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(int i, @Nullable SerializableFunction<KV<Integer, Long>, Instant> serializableFunction) {
        return mkKafkaReadTransform(i, i, serializableFunction);
    }

    private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(int i, int i2, @Nullable SerializableFunction<KV<Integer, Long>, Instant> serializableFunction) {
        ImmutableList of = ImmutableList.of("topic_a", "topic_b");
        KafkaIO.Read<Integer, Long> withMaxNumRecords = KafkaIO.read().withBootstrapServers("myServer1:9092,myServer2:9092").withTopics(of).withConsumerFactoryFn(new ConsumerFactoryFn(of, 10, i, OffsetResetStrategy.EARLIEST)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withMaxNumRecords(i2);
        return serializableFunction != null ? withMaxNumRecords.withTimestampFn(serializableFunction) : withMaxNumRecords;
    }

    public static void addCountingAsserts(PCollection<Long> pCollection, long j) {
        addCountingAsserts(pCollection, j, j, 0L, j - 1);
    }

    public static void addCountingAsserts(PCollection<Long> pCollection, long j, long j2, long j3, long j4) {
        PAssert.thatSingleton(pCollection.apply("Count", Count.globally())).isEqualTo(Long.valueOf(j));
        PAssert.thatSingleton(pCollection.apply(Distinct.create()).apply("UniqueCount", Count.globally())).isEqualTo(Long.valueOf(j2));
        PAssert.thatSingleton(pCollection.apply("Min", Min.globally())).isEqualTo(Long.valueOf(j3));
        PAssert.thatSingleton(pCollection.apply("Max", Max.globally())).isEqualTo(Long.valueOf(j4));
    }

    @Test
    public void testUnboundedSource() {
        addCountingAsserts(this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(Values.create()), 1000);
        this.p.run();
    }

    @Test
    public void testUnreachableKafkaBrokers() {
        this.thrown.expect(Exception.class);
        this.thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");
        addCountingAsserts(this.p.apply(KafkaIO.read().withBootstrapServers("8.8.8.8:9092").withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0))).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).updateConsumerProperties(ImmutableMap.of("request.timeout.ms", 10, "heartbeat.interval.ms", 5, "session.timeout.ms", 8, "fetch.max.wait.ms", 8)).withMaxNumRecords(10L).withoutMetadata()).apply(Values.create()), 1000);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceWithSingleTopic() {
        addCountingAsserts(this.p.apply(KafkaIO.read().withBootstrapServers("none").withTopic("my_topic").withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("my_topic"), 10, 1000, OffsetResetStrategy.EARLIEST)).withMaxNumRecords(1000).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withoutMetadata()).apply(Values.create()), 1000);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceWithExplicitPartitions() {
        PCollection apply = this.p.apply(KafkaIO.read().withBootstrapServers("none").withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5))).withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("test"), 10, 1000, OffsetResetStrategy.EARLIEST)).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(LongDeserializer.class).withMaxNumRecords(1000 / 10).withoutMetadata()).apply(Values.create());
        PAssert.that(apply).satisfies(new AssertMultipleOf(5));
        PAssert.thatSingleton(apply.apply(Count.globally())).isEqualTo(Long.valueOf(1000 / 10));
        this.p.run();
    }

    @Test
    public void testUnboundedSourceTimestamps() {
        PCollection apply = this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(Values.create());
        addCountingAsserts(apply, 1000);
        PAssert.thatSingleton(apply.apply("TimestampDiff", ParDo.of(new ElementValueDiff())).apply("DistinctTimestamps", Distinct.create())).isEqualTo(0L);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceSplits() throws Exception {
        List split = mkKafkaReadTransform(1000, null).withKeyDeserializerAndCoder(IntegerDeserializer.class, BigEndianIntegerCoder.of()).withValueDeserializerAndCoder(LongDeserializer.class, BigEndianLongCoder.of()).makeSource().split(10, this.p.getOptions());
        Assert.assertEquals("Expected exact splitting", 10, split.size());
        long j = 1000 / 10;
        Assert.assertEquals("Expected even splits", 1000, j * 10);
        PCollectionList empty = PCollectionList.empty(this.p);
        for (int i = 0; i < split.size(); i++) {
            empty = empty.and(this.p.apply("split" + i, Read.from((UnboundedSource) split.get(i)).withMaxNumRecords(j)).apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata())).apply("collection " + i, Values.create()));
        }
        addCountingAsserts(empty.apply(Flatten.pCollections()), 1000);
        this.p.run();
    }

    private static void advanceOnce(UnboundedSource.UnboundedReader<?> unboundedReader, boolean z) throws IOException {
        if (z || !unboundedReader.start()) {
            while (!unboundedReader.advance()) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Test
    public void testUnboundedSourceCheckpointMark() throws Exception {
        UnboundedSource unboundedSource = (UnboundedSource) mkKafkaReadTransform(85, new ValueAsTimestampFn()).makeSource().split(1, PipelineOptionsFactory.create()).get(0);
        UnboundedSource.UnboundedReader createReader = unboundedSource.createReader((PipelineOptions) null, (UnboundedSource.CheckpointMark) null);
        int i = 0;
        while (i < 20) {
            advanceOnce(createReader, i > 0);
            i++;
        }
        Assert.assertEquals(19L, ((Long) ((KafkaRecord) createReader.getCurrent()).getKV().getValue()).longValue());
        Assert.assertEquals(19L, createReader.getCurrentTimestamp().getMillis());
        UnboundedSource.UnboundedReader createReader2 = unboundedSource.createReader((PipelineOptions) null, (KafkaCheckpointMark) CoderUtils.clone(unboundedSource.getCheckpointMarkCoder(), createReader.getCheckpointMark()));
        int i2 = 20;
        while (i2 < 85) {
            advanceOnce(createReader2, i2 > 20);
            Assert.assertEquals(i2, ((Long) ((KafkaRecord) createReader2.getCurrent()).getKV().getValue()).longValue());
            Assert.assertEquals(i2, createReader2.getCurrentTimestamp().getMillis());
            i2++;
        }
    }

    @Test
    public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Exception {
        UnboundedSource unboundedSource = (UnboundedSource) mkKafkaReadTransform(5, new ValueAsTimestampFn()).makeSource().split(1, PipelineOptionsFactory.create()).get(0);
        UnboundedSource.UnboundedReader createReader = unboundedSource.createReader((PipelineOptions) null, (UnboundedSource.CheckpointMark) null);
        int i = 0;
        while (i < 5) {
            advanceOnce(createReader, i > 0);
            i++;
        }
        KafkaCheckpointMark kafkaCheckpointMark = (KafkaCheckpointMark) CoderUtils.clone(unboundedSource.getCheckpointMarkCoder(), createReader.getCheckpointMark());
        ImmutableList of = ImmutableList.of("topic_a", "topic_b");
        UnboundedSource.UnboundedReader createReader2 = ((UnboundedSource) KafkaIO.read().withBootstrapServers("none").withTopics(of).withConsumerFactoryFn(new ConsumerFactoryFn(of, 10, 100, OffsetResetStrategy.LATEST)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withMaxNumRecords(100).withTimestampFn(new ValueAsTimestampFn()).makeSource().split(1, PipelineOptionsFactory.create()).get(0)).createReader((PipelineOptions) null, kafkaCheckpointMark);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        long j = 5;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                Assert.assertThat(arrayList2, IsIterableContainingInAnyOrder.containsInAnyOrder(arrayList.toArray()));
                return;
            }
            advanceOnce(createReader2, j2 > ((long) 5));
            arrayList.add(Long.valueOf(j2));
            arrayList2.add(((KafkaRecord) createReader2.getCurrent()).getKV().getValue());
            j = j2 + 1;
        }
    }

    @Test
    public void testUnboundedSourceMetrics() {
        this.p.apply("readFromKafka", mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata());
        PipelineResult run = this.p.run();
        MetricName name = SourceMetrics.elementsRead().getName();
        MetricName name2 = SourceMetrics.elementsReadBySplit("0").getName();
        MetricName name3 = SourceMetrics.bytesRead().getName();
        MetricName name4 = SourceMetrics.bytesReadBySplit("0").getName();
        MetricName name5 = SourceMetrics.backlogElementsOfSplit("0").getName();
        MetricName name6 = SourceMetrics.backlogBytesOfSplit("0").getName();
        Iterable counters = run.metrics().queryMetrics(MetricsFilter.builder().build()).counters();
        Assert.assertThat(counters, Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name.namespace(), name.name(), "readFromKafka", 1000L)));
        Assert.assertThat(counters, Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name2.namespace(), name2.name(), "readFromKafka", 1000L)));
        Assert.assertThat(counters, Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name3.namespace(), name3.name(), "readFromKafka", 12000L)));
        Assert.assertThat(counters, Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name4.namespace(), name4.name(), "readFromKafka", 12000L)));
        Assert.assertThat(run.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(name5.namespace(), name5.name())).build()).gauges(), IsIterableWithSize.iterableWithSize(1));
        Assert.assertThat(run.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(name6.namespace(), name6.name())).build()).gauges(), IsIterableWithSize.iterableWithSize(1));
    }

    @Test
    public void testSink() throws Exception {
        synchronized (MOCK_PRODUCER_LOCK) {
            MOCK_PRODUCER.clear();
            ProducerSendCompletionThread start = new ProducerSendCompletionThread().start();
            this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn()));
            this.p.run();
            start.shutdown();
            verifyProducerRecords("test", 1000, false);
        }
    }

    @Test
    public void testValuesSink() throws Exception {
        synchronized (MOCK_PRODUCER_LOCK) {
            MOCK_PRODUCER.clear();
            ProducerSendCompletionThread start = new ProducerSendCompletionThread().start();
            this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(Values.create()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn()).values());
            this.p.run();
            start.shutdown();
            verifyProducerRecords("test", 1000, true);
        }
    }

    @Test
    public void testSinkWithSendErrors() throws Throwable {
        this.thrown.expect(InjectedErrorException.class);
        this.thrown.expectMessage("Injected Error #1");
        synchronized (MOCK_PRODUCER_LOCK) {
            MOCK_PRODUCER.clear();
            ProducerSendCompletionThread start = new ProducerSendCompletionThread(10, 100).start();
            this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn()));
            try {
                try {
                    this.p.run();
                    start.shutdown();
                } catch (Throwable th) {
                    start.shutdown();
                    throw th;
                }
            } catch (Pipeline.PipelineExecutionException e) {
                throw e.getCause().getCause();
            }
        }
    }

    @Test
    public void testUnboundedSourceStartReadTime() {
        Assume.assumeTrue(new ConsumerSpEL().hasOffsetsForTimes());
        int i = 1000 / 2;
        addCountingAsserts(this.p.apply(mkKafkaReadTransform(1000, i, new ValueAsTimestampFn()).withStartReadTime(new Instant((1000 / 20) / 2)).withoutMetadata()).apply(Values.create()), i, i, i, 1000 - 1);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceStartReadTimeException() {
        Assume.assumeTrue(new ConsumerSpEL().hasOffsetsForTimes());
        this.noMessagesException.expect(RuntimeException.class);
        this.p.apply(mkKafkaReadTransform(1000, 1000, new ValueAsTimestampFn()).withStartReadTime(new Instant(1000 / 20)).withoutMetadata()).apply(Values.create());
        this.p.run();
    }

    @Test
    public void testSourceDisplayData() {
        DisplayData from = DisplayData.from(mkKafkaReadTransform(10, null));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("topics", "topic_a,topic_b"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("enable.auto.commit", false));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("auto.offset.reset", "latest"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("receive.buffer.bytes", 524288L));
    }

    @Test
    public void testSourceWithExplicitPartitionsDisplayData() {
        DisplayData from = DisplayData.from(KafkaIO.read().withBootstrapServers("myServer1:9092,myServer2:9092").withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 6))).withConsumerFactoryFn(new ConsumerFactoryFn(Lists.newArrayList(new String[]{"test"}), 10, 10, OffsetResetStrategy.EARLIEST)).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(LongDeserializer.class));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("topicPartitions", "test-5,test-6"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("enable.auto.commit", false));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("auto.offset.reset", "latest"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("receive.buffer.bytes", 524288L));
    }

    @Test
    public void testSinkDisplayData() {
        DisplayData from = DisplayData.from(KafkaIO.write().withBootstrapServers("myServerA:9092,myServerB:9092").withTopic("myTopic").withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn()));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("topic", "myTopic"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("retries", 3L));
    }

    @Test
    public void testInferKeyCoder() {
        CoderRegistry createDefault = CoderRegistry.createDefault();
        Assert.assertTrue(KafkaIO.inferCoder(createDefault, LongDeserializer.class).getValueCoder() instanceof VarLongCoder);
        Assert.assertTrue(KafkaIO.inferCoder(createDefault, StringDeserializer.class).getValueCoder() instanceof StringUtf8Coder);
        Assert.assertTrue(KafkaIO.inferCoder(createDefault, InstantDeserializer.class).getValueCoder() instanceof InstantCoder);
        Assert.assertTrue(KafkaIO.inferCoder(createDefault, DeserializerWithInterfaces.class).getValueCoder() instanceof VarLongCoder);
    }

    @Test
    public void testInferKeyCoderFailure() throws Exception {
        this.cannotInferException.expect(RuntimeException.class);
        KafkaIO.inferCoder(CoderRegistry.createDefault(), NonInferableObjectDeserializer.class);
    }

    @Test
    public void testSinkMetrics() throws Exception {
        synchronized (MOCK_PRODUCER_LOCK) {
            MOCK_PRODUCER.clear();
            ProducerSendCompletionThread start = new ProducerSendCompletionThread().start();
            this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply("writeToKafka", KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn()));
            PipelineResult run = this.p.run();
            MetricName name = SinkMetrics.elementsWritten().getName();
            Assert.assertThat(run.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.inNamespace(name.namespace())).build()).counters(), Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name.namespace(), name.name(), "writeToKafka", 1000L)));
            start.shutdown();
        }
    }

    private static void verifyProducerRecords(String str, int i, boolean z) {
        List history = MOCK_PRODUCER.history();
        Collections.sort(history, new Comparator<ProducerRecord<Integer, Long>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.3
            @Override // java.util.Comparator
            public int compare(ProducerRecord<Integer, Long> producerRecord, ProducerRecord<Integer, Long> producerRecord2) {
                return Long.compare(((Long) producerRecord.value()).longValue(), ((Long) producerRecord2.value()).longValue());
            }
        });
        for (int i2 = 0; i2 < i; i2++) {
            ProducerRecord producerRecord = (ProducerRecord) history.get(i2);
            Assert.assertEquals(str, producerRecord.topic());
            if (z) {
                Assert.assertNull(producerRecord.key());
            } else {
                Assert.assertEquals(i2, ((Integer) producerRecord.key()).intValue());
            }
            Assert.assertEquals(i2, ((Long) producerRecord.value()).longValue());
        }
    }
}
