package org.apache.beam.sdk.io.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest.class */
public class KafkaIOTest {

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static final MockProducer<Integer, Long> MOCK_PRODUCER = new MockProducer<Integer, Long>(false, new KafkaIO.CoderBasedKafkaSerializer(), new KafkaIO.CoderBasedKafkaSerializer()) { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.4
        public void flush() {
            while (completeNext()) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                }
            }
        }
    };
    private static final Object MOCK_PRODUCER_LOCK = new Object();

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$AssertMultipleOf.class */
    private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
        private final int num;

        public AssertMultipleOf(int i) {
            this.num = i;
        }

        public Void apply(Iterable<Long> iterable) {
            Iterator<Long> it = iterable.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(0L, it.next().longValue() % this.num);
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ConsumerFactoryFn.class */
    public static class ConsumerFactoryFn implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
        private final List<String> topics;
        private final int partitionsPerTopic;
        private final int numElements;
        private final OffsetResetStrategy offsetResetStrategy;

        public ConsumerFactoryFn(List<String> list, int i, int i2, OffsetResetStrategy offsetResetStrategy) {
            this.topics = list;
            this.partitionsPerTopic = i;
            this.numElements = i2;
            this.offsetResetStrategy = offsetResetStrategy;
        }

        public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
            return KafkaIOTest.mkMockConsumer(this.topics, this.partitionsPerTopic, this.numElements, this.offsetResetStrategy);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ElementValueDiff.class */
    private static class ElementValueDiff extends DoFn<Long, Long> {
        private ElementValueDiff() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, Long>.ProcessContext processContext) throws Exception {
            processContext.output(Long.valueOf(((Long) processContext.element()).longValue() - processContext.timestamp().getMillis()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$InjectedErrorException.class */
    private static class InjectedErrorException extends RuntimeException {
        public InjectedErrorException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ProducerFactoryFn.class */
    private static class ProducerFactoryFn implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> {
        private ProducerFactoryFn() {
        }

        public Producer<Integer, Long> apply(Map<String, Object> map) {
            return KafkaIOTest.MOCK_PRODUCER;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ProducerSendCompletionThread.class */
    private static class ProducerSendCompletionThread {
        private final int maxErrors;
        private final int errorFrequency;
        private final AtomicBoolean done;
        private final ExecutorService injectorThread;
        private int numCompletions;

        ProducerSendCompletionThread() {
            this(0, 0);
        }

        ProducerSendCompletionThread(int i, int i2) {
            this.done = new AtomicBoolean(false);
            this.numCompletions = 0;
            this.maxErrors = i;
            this.errorFrequency = i2;
            this.injectorThread = Executors.newSingleThreadExecutor();
        }

        ProducerSendCompletionThread start() {
            this.injectorThread.submit(new Runnable() { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.ProducerSendCompletionThread.1
                @Override // java.lang.Runnable
                public void run() {
                    boolean completeNext;
                    int i = 0;
                    while (!ProducerSendCompletionThread.this.done.get()) {
                        if (i >= ProducerSendCompletionThread.this.maxErrors || (ProducerSendCompletionThread.this.numCompletions + 1) % ProducerSendCompletionThread.this.errorFrequency != 0) {
                            completeNext = KafkaIOTest.MOCK_PRODUCER.completeNext();
                        } else {
                            completeNext = KafkaIOTest.MOCK_PRODUCER.errorNext(new InjectedErrorException("Injected Error #" + (i + 1)));
                            if (completeNext) {
                                i++;
                            }
                        }
                        if (completeNext) {
                            ProducerSendCompletionThread.access$808(ProducerSendCompletionThread.this);
                        } else {
                            try {
                                Thread.sleep(1L);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                }
            });
            return this;
        }

        void shutdown() {
            this.done.set(true);
            this.injectorThread.shutdown();
            try {
                Assert.assertTrue(this.injectorThread.awaitTermination(10L, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        static /* synthetic */ int access$808(ProducerSendCompletionThread producerSendCompletionThread) {
            int i = producerSendCompletionThread.numCompletions;
            producerSendCompletionThread.numCompletions = i + 1;
            return i;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$RemoveKafkaMetadata.class */
    private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
        private RemoveKafkaMetadata() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KafkaRecord<K, V>, KV<K, V>>.ProcessContext processContext) throws Exception {
            processContext.output(((KafkaRecord) processContext.element()).getKV());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ValueAsTimestampFn.class */
    private static class ValueAsTimestampFn implements SerializableFunction<KV<Integer, Long>, Instant> {
        private ValueAsTimestampFn() {
        }

        public Instant apply(KV<Integer, Long> kv) {
            return new Instant(kv.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MockConsumer<byte[], byte[]> mkMockConsumer(List<String> list, int i, int i2, OffsetResetStrategy offsetResetStrategy) {
        ArrayList arrayList = new ArrayList();
        final HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (String str : list) {
            ArrayList arrayList2 = new ArrayList(i);
            for (int i3 = 0; i3 < i; i3++) {
                TopicPartition topicPartition = new TopicPartition(str, i3);
                arrayList.add(topicPartition);
                arrayList2.add(new PartitionInfo(str, i3, (Node) null, (Node[]) null, (Node[]) null));
                hashMap.put(topicPartition, new ArrayList());
            }
            hashMap2.put(str, arrayList2);
        }
        int size = arrayList.size();
        long[] jArr = new long[size];
        for (int i4 = 0; i4 < i2; i4++) {
            int i5 = i4 % size;
            TopicPartition topicPartition2 = (TopicPartition) arrayList.get(i5);
            List list2 = (List) hashMap.get(topicPartition2);
            String str2 = topicPartition2.topic();
            int partition = topicPartition2.partition();
            long j = jArr[i5];
            jArr[i5] = j + 1;
            list2.add(new ConsumerRecord(str2, partition, j, ByteBuffer.wrap(new byte[4]).putInt(i4).array(), ByteBuffer.wrap(new byte[8]).putLong(i4).array()));
        }
        final AtomicReference atomicReference = new AtomicReference(Collections.emptyList());
        final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(offsetResetStrategy) { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.1
            public void assign(List<TopicPartition> list3) {
                super.assign(list3);
                atomicReference.set(ImmutableList.copyOf(list3));
                for (TopicPartition topicPartition3 : list3) {
                    updateBeginningOffsets(ImmutableMap.of(topicPartition3, 0L));
                    updateEndOffsets(ImmutableMap.of(topicPartition3, Long.valueOf(((List) hashMap.get(topicPartition3)).size())));
                }
            }
        };
        for (String str3 : list) {
            mockConsumer.updatePartitions(str3, (List) hashMap2.get(str3));
        }
        mockConsumer.schedulePollTask(new Runnable() { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.2
            @Override // java.lang.Runnable
            public void run() {
                for (TopicPartition topicPartition3 : (List) atomicReference.get()) {
                    long position = mockConsumer.position(topicPartition3);
                    for (ConsumerRecord consumerRecord : (List) hashMap.get(topicPartition3)) {
                        if (consumerRecord.offset() >= position) {
                            mockConsumer.addRecord(consumerRecord);
                        }
                    }
                }
                mockConsumer.schedulePollTask(this);
            }
        });
        return mockConsumer;
    }

    private static KafkaIO.TypedRead<Integer, Long> mkKafkaReadTransform(int i, @Nullable SerializableFunction<KV<Integer, Long>, Instant> serializableFunction) {
        ImmutableList of = ImmutableList.of("topic_a", "topic_b");
        KafkaIO.Read withMaxNumRecords = KafkaIO.read().withBootstrapServers("none").withTopics(of).withConsumerFactoryFn(new ConsumerFactoryFn(of, 10, i, OffsetResetStrategy.EARLIEST)).withKeyCoder(BigEndianIntegerCoder.of()).withValueCoder(BigEndianLongCoder.of()).withMaxNumRecords(i);
        return serializableFunction != null ? withMaxNumRecords.withTimestampFn(serializableFunction) : withMaxNumRecords;
    }

    public static void addCountingAsserts(PCollection<Long> pCollection, long j) {
        PAssert.thatSingleton(pCollection.apply("Count", Count.globally())).isEqualTo(Long.valueOf(j));
        PAssert.thatSingleton(pCollection.apply(RemoveDuplicates.create()).apply("UniqueCount", Count.globally())).isEqualTo(Long.valueOf(j));
        PAssert.thatSingleton(pCollection.apply("Min", Min.globally())).isEqualTo(0L);
        PAssert.thatSingleton(pCollection.apply("Max", Max.globally())).isEqualTo(Long.valueOf(j - 1));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedSource() {
        TestPipeline create = TestPipeline.create();
        addCountingAsserts(create.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(Values.create()), 1000);
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedSourceWithExplicitPartitions() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(KafkaIO.read().withBootstrapServers("none").withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5))).withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("test"), 10, 1000, OffsetResetStrategy.EARLIEST)).withValueCoder(BigEndianLongCoder.of()).withMaxNumRecords(1000 / 10).withoutMetadata()).apply(Values.create());
        PAssert.that(apply).satisfies(new AssertMultipleOf(5));
        PAssert.thatSingleton(apply.apply(Count.globally())).isEqualTo(Long.valueOf(1000 / 10));
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedSourceTimestamps() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(Values.create());
        addCountingAsserts(apply, 1000);
        PAssert.thatSingleton(apply.apply("TimestampDiff", ParDo.of(new ElementValueDiff())).apply("RemoveDuplicateTimestamps", RemoveDuplicates.create())).isEqualTo(0L);
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedSourceSplits() throws Exception {
        TestPipeline create = TestPipeline.create();
        List generateInitialSplits = mkKafkaReadTransform(1000, null).makeSource().generateInitialSplits(10, create.getOptions());
        Assert.assertEquals("Expected exact splitting", 10, generateInitialSplits.size());
        long j = 1000 / 10;
        Assert.assertEquals("Expected even splits", 1000, j * 10);
        PCollectionList empty = PCollectionList.empty(create);
        for (int i = 0; i < generateInitialSplits.size(); i++) {
            empty = empty.and(create.apply("split" + i, Read.from((UnboundedSource) generateInitialSplits.get(i)).withMaxNumRecords(j)).apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata())).apply("collection " + i, Values.create()));
        }
        addCountingAsserts(empty.apply(Flatten.pCollections()), 1000);
        create.run();
    }

    private static void advanceOnce(UnboundedSource.UnboundedReader<?> unboundedReader, boolean z) throws IOException {
        if (z || !unboundedReader.start()) {
            while (!unboundedReader.advance()) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Test
    public void testUnboundedSourceCheckpointMark() throws Exception {
        UnboundedSource unboundedSource = (UnboundedSource) mkKafkaReadTransform(85, new ValueAsTimestampFn()).makeSource().generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create()).get(0);
        UnboundedSource.UnboundedReader createReader = unboundedSource.createReader((PipelineOptions) null, (UnboundedSource.CheckpointMark) null);
        int i = 0;
        while (i < 20) {
            advanceOnce(createReader, i > 0);
            i++;
        }
        Assert.assertEquals(19L, ((Long) ((KafkaRecord) createReader.getCurrent()).getKV().getValue()).longValue());
        Assert.assertEquals(19L, createReader.getCurrentTimestamp().getMillis());
        UnboundedSource.UnboundedReader createReader2 = unboundedSource.createReader((PipelineOptions) null, (KafkaCheckpointMark) CoderUtils.clone(unboundedSource.getCheckpointMarkCoder(), createReader.getCheckpointMark()));
        int i2 = 20;
        while (i2 < 85) {
            advanceOnce(createReader2, i2 > 20);
            Assert.assertEquals(i2, ((Long) ((KafkaRecord) createReader2.getCurrent()).getKV().getValue()).longValue());
            Assert.assertEquals(i2, createReader2.getCurrentTimestamp().getMillis());
            i2++;
        }
    }

    @Test
    public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Exception {
        UnboundedSource unboundedSource = (UnboundedSource) mkKafkaReadTransform(5, new ValueAsTimestampFn()).makeSource().generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create()).get(0);
        UnboundedSource.UnboundedReader createReader = unboundedSource.createReader((PipelineOptions) null, (UnboundedSource.CheckpointMark) null);
        int i = 0;
        while (i < 5) {
            advanceOnce(createReader, i > 0);
            i++;
        }
        KafkaCheckpointMark kafkaCheckpointMark = (KafkaCheckpointMark) CoderUtils.clone(unboundedSource.getCheckpointMarkCoder(), createReader.getCheckpointMark());
        ImmutableList of = ImmutableList.of("topic_a", "topic_b");
        UnboundedSource.UnboundedReader createReader2 = ((UnboundedSource) KafkaIO.read().withBootstrapServers("none").withTopics(of).withConsumerFactoryFn(new ConsumerFactoryFn(of, 10, 100, OffsetResetStrategy.LATEST)).withKeyCoder(BigEndianIntegerCoder.of()).withValueCoder(BigEndianLongCoder.of()).withMaxNumRecords(100).withTimestampFn(new ValueAsTimestampFn()).makeSource().generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create()).get(0)).createReader((PipelineOptions) null, kafkaCheckpointMark);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        long j = 5;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                Assert.assertThat(arrayList2, IsIterableContainingInAnyOrder.containsInAnyOrder(arrayList.toArray()));
                return;
            }
            advanceOnce(createReader2, j2 > ((long) 5));
            arrayList.add(Long.valueOf(j2));
            arrayList2.add(((KafkaRecord) createReader2.getCurrent()).getKV().getValue());
            j = j2 + 1;
        }
    }

    @Test
    public void testSink() throws Exception {
        synchronized (MOCK_PRODUCER_LOCK) {
            MOCK_PRODUCER.clear();
            ProducerSendCompletionThread start = new ProducerSendCompletionThread().start();
            TestPipeline create = TestPipeline.create();
            create.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeyCoder(BigEndianIntegerCoder.of()).withValueCoder(BigEndianLongCoder.of()).withProducerFactoryFn(new ProducerFactoryFn()));
            create.run();
            start.shutdown();
            verifyProducerRecords("test", 1000, false);
        }
    }

    @Test
    public void testValuesSink() throws Exception {
        synchronized (MOCK_PRODUCER_LOCK) {
            MOCK_PRODUCER.clear();
            ProducerSendCompletionThread start = new ProducerSendCompletionThread().start();
            TestPipeline create = TestPipeline.create();
            create.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(Values.create()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeyCoder(BigEndianIntegerCoder.of()).withValueCoder(BigEndianLongCoder.of()).withProducerFactoryFn(new ProducerFactoryFn()).values());
            create.run();
            start.shutdown();
            verifyProducerRecords("test", 1000, true);
        }
    }

    @Test
    public void testSinkWithSendErrors() throws Throwable {
        this.thrown.expect(InjectedErrorException.class);
        this.thrown.expectMessage("Injected Error #1");
        synchronized (MOCK_PRODUCER_LOCK) {
            MOCK_PRODUCER.clear();
            TestPipeline create = TestPipeline.create();
            ProducerSendCompletionThread start = new ProducerSendCompletionThread(10, 100).start();
            create.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeyCoder(BigEndianIntegerCoder.of()).withValueCoder(BigEndianLongCoder.of()).withProducerFactoryFn(new ProducerFactoryFn()));
            try {
                try {
                    create.run();
                    start.shutdown();
                } catch (Throwable th) {
                    start.shutdown();
                    throw th;
                }
            } catch (Pipeline.PipelineExecutionException e) {
                throw e.getCause().getCause();
            }
        }
    }

    private static void verifyProducerRecords(String str, int i, boolean z) {
        List history = MOCK_PRODUCER.history();
        Collections.sort(history, new Comparator<ProducerRecord<Integer, Long>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.3
            @Override // java.util.Comparator
            public int compare(ProducerRecord<Integer, Long> producerRecord, ProducerRecord<Integer, Long> producerRecord2) {
                return Long.compare(((Long) producerRecord.value()).longValue(), ((Long) producerRecord2.value()).longValue());
            }
        });
        for (int i2 = 0; i2 < i; i2++) {
            ProducerRecord producerRecord = (ProducerRecord) history.get(i2);
            Assert.assertEquals(str, producerRecord.topic());
            if (z) {
                Assert.assertNull(producerRecord.key());
            } else {
                Assert.assertEquals(i2, ((Integer) producerRecord.key()).intValue());
            }
            Assert.assertEquals(i2, ((Long) producerRecord.value()).longValue());
        }
    }
}
