/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ReadFromKafkaDoFnTest {
    private final TopicPartition topicPartition = new TopicPartition("topic", 0);
    private final SimpleMockKafkaConsumer consumer = new SimpleMockKafkaConsumer(OffsetResetStrategy.NONE, this.topicPartition);
    private final ReadFromKafkaDoFn<String, String> dofnInstance = new ReadFromKafkaDoFn(this.makeReadSourceDescriptor((Consumer)this.consumer));

    private KafkaIO.ReadSourceDescriptors<String, String> makeReadSourceDescriptor(final Consumer kafkaMockConsumer) {
        return KafkaIO.ReadSourceDescriptors.read().withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class).withConsumerFactoryFn((SerializableFunction)new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>(){

            public Consumer<byte[], byte[]> apply(Map<String, Object> input) {
                return kafkaMockConsumer;
            }
        }).withBootstrapServers("bootstrap_server");
    }

    private List<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> createExpectedRecords(KafkaSourceDescriptor descriptor, long startOffset, int numRecords, String key, String value) {
        ArrayList<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> records = new ArrayList<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>>();
        for (int i = 0; i < numRecords; ++i) {
            records.add((KV<KafkaSourceDescriptor, KafkaRecord<String, String>>)KV.of((Object)descriptor, (Object)new KafkaRecord(this.topicPartition.topic(), this.topicPartition.partition(), startOffset + (long)i, -1L, KafkaTimestampType.NO_TIMESTAMP_TYPE, (Headers)new RecordHeaders(), KV.of((Object)key, (Object)value))));
        }
        return records;
    }

    @Before
    public void setUp() throws Exception {
        this.dofnInstance.setup();
        this.consumer.reset();
    }

    @Test
    public void testInitialRestrictionWhenHasStartOffset() throws Exception {
        long expectedStartOffset = 10L;
        this.consumer.setStartOffsetForTime(15L, Instant.now());
        this.consumer.setCurrentPos(5L);
        OffsetRange result = this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of((TopicPartition)this.topicPartition, (Long)expectedStartOffset, null, null, null, (List)ImmutableList.of()));
        Assert.assertEquals((Object)new OffsetRange(expectedStartOffset, Long.MAX_VALUE), (Object)result);
    }

    @Test
    public void testInitialRestrictionWhenHasStopOffset() throws Exception {
        long expectedStartOffset = 10L;
        long expectedStopOffset = 20L;
        this.consumer.setStartOffsetForTime(15L, Instant.now());
        this.consumer.setStopOffsetForTime(18L, Instant.now());
        this.consumer.setCurrentPos(5L);
        OffsetRange result = this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of((TopicPartition)this.topicPartition, (Long)expectedStartOffset, null, (Long)expectedStopOffset, null, (List)ImmutableList.of()));
        Assert.assertEquals((Object)new OffsetRange(expectedStartOffset, expectedStopOffset), (Object)result);
    }

    @Test
    public void testInitialRestrictionWhenHasStartTime() throws Exception {
        long expectedStartOffset = 10L;
        Instant startReadTime = Instant.now();
        this.consumer.setStartOffsetForTime(expectedStartOffset, startReadTime);
        this.consumer.setCurrentPos(5L);
        OffsetRange result = this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of((TopicPartition)this.topicPartition, null, (Instant)startReadTime, null, null, (List)ImmutableList.of()));
        Assert.assertEquals((Object)new OffsetRange(expectedStartOffset, Long.MAX_VALUE), (Object)result);
    }

    @Test
    public void testInitialRestrictionWhenHasStopTime() throws Exception {
        long expectedStartOffset = 10L;
        Instant startReadTime = Instant.now();
        long expectedStopOffset = 100L;
        Instant stopReadTime = startReadTime.plus((ReadableDuration)Duration.millis((long)2000L));
        this.consumer.setStartOffsetForTime(expectedStartOffset, startReadTime);
        this.consumer.setStopOffsetForTime(expectedStopOffset, stopReadTime);
        this.consumer.setCurrentPos(5L);
        OffsetRange result = this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of((TopicPartition)this.topicPartition, null, (Instant)startReadTime, null, (Instant)stopReadTime, (List)ImmutableList.of()));
        Assert.assertEquals((Object)new OffsetRange(expectedStartOffset, expectedStopOffset), (Object)result);
    }

    @Test
    public void testInitialRestrictionWithConsumerPosition() throws Exception {
        long expectedStartOffset = 5L;
        this.consumer.setCurrentPos(5L);
        OffsetRange result = this.dofnInstance.initialRestriction(KafkaSourceDescriptor.of((TopicPartition)this.topicPartition, null, null, null, null, (List)ImmutableList.of()));
        Assert.assertEquals((Object)new OffsetRange(expectedStartOffset, Long.MAX_VALUE), (Object)result);
    }

    @Test
    public void testProcessElement() throws Exception {
        MockOutputReceiver receiver = new MockOutputReceiver();
        this.consumer.setNumOfRecordsPerPoll(3L);
        long startOffset = 5L;
        OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3L));
        KafkaSourceDescriptor descriptor = KafkaSourceDescriptor.of((TopicPartition)this.topicPartition, null, null, null, null, null);
        DoFn.ProcessContinuation result = this.dofnInstance.processElement(descriptor, (RestrictionTracker)tracker, null, (DoFn.OutputReceiver)receiver);
        Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)result);
        Assert.assertEquals(this.createExpectedRecords(descriptor, startOffset, 3, "key", "value"), receiver.getOutputs());
    }

    @Test
    public void testProcessElementWithEmptyPoll() throws Exception {
        MockOutputReceiver receiver = new MockOutputReceiver();
        this.consumer.setNumOfRecordsPerPoll(-1L);
        OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
        DoFn.ProcessContinuation result = this.dofnInstance.processElement(KafkaSourceDescriptor.of((TopicPartition)this.topicPartition, null, null, null, null, null), (RestrictionTracker)tracker, null, (DoFn.OutputReceiver)receiver);
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)result);
        Assert.assertTrue((boolean)receiver.getOutputs().isEmpty());
    }

    @Test
    public void testProcessElementWhenTopicPartitionIsRemoved() throws Exception {
        MockOutputReceiver receiver = new MockOutputReceiver();
        this.consumer.setRemoved();
        this.consumer.setNumOfRecordsPerPoll(10L);
        OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
        DoFn.ProcessContinuation result = this.dofnInstance.processElement(KafkaSourceDescriptor.of((TopicPartition)this.topicPartition, null, null, null, null, null), (RestrictionTracker)tracker, null, (DoFn.OutputReceiver)receiver);
        Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)result);
    }

    @Test
    public void testProcessElementWhenTopicPartitionIsStopped() throws Exception {
        MockOutputReceiver receiver = new MockOutputReceiver();
        ReadFromKafkaDoFn instance = new ReadFromKafkaDoFn(this.makeReadSourceDescriptor((Consumer)this.consumer).toBuilder().setCheckStopReadingFn((SerializableFunction)new SerializableFunction<TopicPartition, Boolean>(){

            public Boolean apply(TopicPartition input) {
                Assert.assertTrue((boolean)input.equals((Object)ReadFromKafkaDoFnTest.this.topicPartition));
                return true;
            }
        }).build());
        this.consumer.setNumOfRecordsPerPoll(10L);
        OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
        DoFn.ProcessContinuation result = instance.processElement(KafkaSourceDescriptor.of((TopicPartition)this.topicPartition, null, null, null, null, null), (RestrictionTracker)tracker, null, (DoFn.OutputReceiver)receiver);
        Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)result);
    }

    private static class MockOutputReceiver
    implements DoFn.OutputReceiver<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> {
        private final List<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> records = new ArrayList<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>>();

        private MockOutputReceiver() {
        }

        public void output(KV<KafkaSourceDescriptor, KafkaRecord<String, String>> output) {
        }

        public void outputWithTimestamp(KV<KafkaSourceDescriptor, KafkaRecord<String, String>> output, @UnknownKeyFor @NonNull @Initialized Instant timestamp) {
            this.records.add(output);
        }

        public List<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> getOutputs() {
            return this.records;
        }
    }

    private static class SimpleMockKafkaConsumer
    extends MockConsumer<byte[], byte[]> {
        private final TopicPartition topicPartition;
        private boolean isRemoved = false;
        private long currentPos = 0L;
        private long startOffset = 0L;
        private KV<Long, Instant> startOffsetForTime = KV.of((Object)0L, (Object)Instant.now());
        private KV<Long, Instant> stopOffsetForTime = KV.of((Object)Long.MAX_VALUE, null);
        private long numOfRecordsPerPoll;

        public SimpleMockKafkaConsumer(OffsetResetStrategy offsetResetStrategy, TopicPartition topicPartition) {
            super(offsetResetStrategy);
            this.topicPartition = topicPartition;
        }

        public void reset() {
            this.isRemoved = false;
            this.currentPos = 0L;
            this.startOffset = 0L;
            this.startOffsetForTime = KV.of((Object)0L, (Object)Instant.now());
            this.stopOffsetForTime = KV.of((Object)Long.MAX_VALUE, null);
            this.numOfRecordsPerPoll = 0L;
        }

        public void setRemoved() {
            this.isRemoved = true;
        }

        public void setNumOfRecordsPerPoll(long num) {
            this.numOfRecordsPerPoll = num;
        }

        public void setCurrentPos(long pos) {
            this.currentPos = pos;
        }

        public void setStartOffsetForTime(long offset, Instant time) {
            this.startOffsetForTime = KV.of((Object)offset, (Object)time);
        }

        public void setStopOffsetForTime(long offset, Instant time) {
            this.stopOffsetForTime = KV.of((Object)offset, (Object)time);
        }

        public synchronized Map<String, List<PartitionInfo>> listTopics() {
            if (this.isRemoved) {
                return ImmutableMap.of();
            }
            return ImmutableMap.of((Object)this.topicPartition.topic(), (Object)ImmutableList.of((Object)new PartitionInfo(this.topicPartition.topic(), this.topicPartition.partition(), null, null, null)));
        }

        public synchronized void assign(Collection<TopicPartition> partitions) {
            Assert.assertTrue((boolean)((TopicPartition)Iterables.getOnlyElement(partitions)).equals((Object)this.topicPartition));
        }

        public synchronized void seek(TopicPartition partition, long offset) {
            Assert.assertTrue((boolean)partition.equals((Object)this.topicPartition));
            this.startOffset = offset;
        }

        public synchronized ConsumerRecords<byte[], byte[]> poll(java.time.Duration timeout) {
            if (this.topicPartition == null) {
                return ConsumerRecords.empty();
            }
            String key = "key";
            String value = "value";
            ArrayList<ConsumerRecord> records = new ArrayList<ConsumerRecord>();
            for (long i = 0L; i <= this.numOfRecordsPerPoll; ++i) {
                records.add(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), this.startOffset + i, (Object)key.getBytes(Charsets.UTF_8), (Object)value.getBytes(Charsets.UTF_8)));
            }
            if (records.isEmpty()) {
                return ConsumerRecords.empty();
            }
            return new ConsumerRecords((Map)ImmutableMap.of((Object)this.topicPartition, records));
        }

        public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
            Assert.assertTrue((boolean)((TopicPartition)Iterables.getOnlyElement((Iterable)timestampsToSearch.keySet().stream().collect(Collectors.toList()))).equals((Object)this.topicPartition));
            Long timeToSearch = (Long)Iterables.getOnlyElement(timestampsToSearch.values());
            Long returnOffset = 0L;
            if (timeToSearch.longValue() == ((Instant)this.startOffsetForTime.getValue()).getMillis()) {
                returnOffset = (Long)this.startOffsetForTime.getKey();
            } else if (timeToSearch.longValue() == ((Instant)this.stopOffsetForTime.getValue()).getMillis()) {
                returnOffset = (Long)this.stopOffsetForTime.getKey();
            }
            return ImmutableMap.of((Object)this.topicPartition, (Object)new OffsetAndTimestamp(returnOffset.longValue(), timeToSearch.longValue()));
        }

        public synchronized long position(TopicPartition partition) {
            Assert.assertTrue((boolean)partition.equals((Object)this.topicPartition));
            return this.currentPos;
        }
    }
}

