/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProviderTest;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.CustomTimestampPolicyWithLimitedDelay;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaPublishTimestampFunction;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.io.kafka.ProducerRecordCoder;
import org.apache.beam.sdk.io.kafka.ProducerSpEL;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResultsMatchers;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableWithSize;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class KafkaIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIOTest.class);
    @Rule
    public final transient TestPipeline p = TestPipeline.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static final Instant LOG_APPEND_START_TIME = new Instant(600000L);
    private static final String TIMESTAMP_START_MILLIS_CONFIG = "test.timestamp.start.millis";
    private static final String TIMESTAMP_TYPE_CONFIG = "test.timestamp.type";
    @Rule
    public ExpectedException noMessagesException = ExpectedException.none();
    private static final ConcurrentMap<String, MockProducer<Integer, Long>> MOCK_PRODUCER_MAP = new ConcurrentHashMap<String, MockProducer<Integer, Long>>();

    private static MockConsumer<byte[], byte[]> mkMockConsumer(List<String> topics, int partitionsPerTopic, int numElements, OffsetResetStrategy offsetResetStrategy, Map<String, Object> config, SerializableFunction<Integer, byte[]> keyFunction, SerializableFunction<Integer, byte[]> valueFunction) {
        final ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        final HashMap records = new HashMap();
        HashMap partitionMap = new HashMap();
        for (String topic : topics) {
            ArrayList<PartitionInfo> partIds = new ArrayList<PartitionInfo>(partitionsPerTopic);
            for (int i = 0; i < partitionsPerTopic; ++i) {
                TopicPartition tp = new TopicPartition(topic, i);
                partitions.add(tp);
                partIds.add(new PartitionInfo(topic, i, null, null, null));
                records.put(tp, new ArrayList());
            }
            partitionMap.put(topic, partIds);
        }
        int numPartitions = partitions.size();
        final long[] offsets = new long[numPartitions];
        long timestampStartMillis = (Long)config.getOrDefault(TIMESTAMP_START_MILLIS_CONFIG, LOG_APPEND_START_TIME.getMillis());
        TimestampType timestampType = TimestampType.forName((String)((String)config.getOrDefault(TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString())));
        for (int i = 0; i < numElements; ++i) {
            int pIdx = i % numPartitions;
            TopicPartition tp = (TopicPartition)partitions.get(pIdx);
            byte[] key = (byte[])keyFunction.apply((Object)i);
            byte[] value = (byte[])valueFunction.apply((Object)i);
            int n = pIdx;
            long l = offsets[n];
            offsets[n] = l + 1L;
            ((List)records.get(tp)).add(new ConsumerRecord(tp.topic(), tp.partition(), l, timestampStartMillis + Duration.standardSeconds((long)i).getMillis(), timestampType, 0L, key.length, value.length, (Object)key, (Object)value));
        }
        final AtomicReference assignedPartitions = new AtomicReference(Collections.emptyList());
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(offsetResetStrategy){

            public synchronized void assign(Collection<TopicPartition> assigned) {
                super.assign(assigned);
                assignedPartitions.set(ImmutableList.copyOf(assigned));
                for (TopicPartition tp : assigned) {
                    this.updateBeginningOffsets((Map)ImmutableMap.of((Object)tp, (Object)0L));
                    this.updateEndOffsets((Map)ImmutableMap.of((Object)tp, (Object)((List)records.get(tp)).size()));
                }
            }

            public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
                return timestampsToSearch.entrySet().stream().map(e -> {
                    long maxOffset = offsets[partitions.indexOf(e.getKey())];
                    long offset = (Long)e.getValue();
                    OffsetAndTimestamp value = offset >= maxOffset ? null : new OffsetAndTimestamp(offset, offset);
                    return new AbstractMap.SimpleEntry<TopicPartition, OffsetAndTimestamp>((TopicPartition)e.getKey(), value);
                }).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
            }
        };
        for (String topic : topics) {
            consumer.updatePartitions(topic, (List)partitionMap.get(topic));
        }
        Runnable recordEnqueueTask = new Runnable((MockConsumer)consumer, records, config){
            final /* synthetic */ MockConsumer val$consumer;
            final /* synthetic */ Map val$records;
            final /* synthetic */ Map val$config;
            {
                this.val$consumer = mockConsumer;
                this.val$records = map;
                this.val$config = map2;
            }

            @Override
            public void run() {
                int recordsAdded = 0;
                for (TopicPartition tp : (List)assignedPartitions.get()) {
                    long curPos = this.val$consumer.position(tp);
                    for (ConsumerRecord r : (List)this.val$records.get(tp)) {
                        if (r.offset() < curPos) continue;
                        this.val$consumer.addRecord(r);
                        ++recordsAdded;
                    }
                }
                if (recordsAdded == 0) {
                    if (this.val$config.get("inject.error.at.eof") != null) {
                        this.val$consumer.setException(new KafkaException("Injected error in consumer.poll()"));
                    }
                    Uninterruptibles.sleepUninterruptibly((long)10L, (TimeUnit)TimeUnit.MILLISECONDS);
                }
                this.val$consumer.schedulePollTask((Runnable)this);
            }
        };
        consumer.schedulePollTask(recordEnqueueTask);
        return consumer;
    }

    private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(int numElements, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
        return KafkaIOTest.mkKafkaReadTransform(numElements, numElements, timestampFn);
    }

    private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(int numElements, int maxNumRecords, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
        ImmutableList topics = ImmutableList.of((Object)"topic_a", (Object)"topic_b");
        KafkaIO.Read reader = KafkaIO.read().withBootstrapServers("myServer1:9092,myServer2:9092").withTopics((List)topics).withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)topics, 10, numElements, OffsetResetStrategy.EARLIEST)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withMaxNumRecords((long)maxNumRecords);
        if (timestampFn != null) {
            return reader.withTimestampFn(timestampFn);
        }
        return reader;
    }

    public static void addCountingAsserts(PCollection<Long> input, long numElements) {
        KafkaIOTest.addCountingAsserts(input, numElements, numElements, 0L, numElements - 1L);
    }

    public static void addCountingAsserts(PCollection<Long> input, long count, long uniqueCount, long min, long max) {
        PAssert.thatSingleton((PCollection)((PCollection)input.apply("Count", Count.globally()))).isEqualTo((Object)count);
        PAssert.thatSingleton((PCollection)((PCollection)((PCollection)input.apply((PTransform)Distinct.create())).apply("UniqueCount", Count.globally()))).isEqualTo((Object)uniqueCount);
        PAssert.thatSingleton((PCollection)((PCollection)input.apply("Min", (PTransform)Min.globally()))).isEqualTo((Object)min);
        PAssert.thatSingleton((PCollection)((PCollection)input.apply("Max", (PTransform)Max.globally()))).isEqualTo((Object)max);
    }

    @Test
    public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() {
        int numElements = 100;
        String topic = "my_topic";
        String schemaRegistryUrl = "mock://my-scope-name";
        String keySchemaSubject = topic + "-key";
        String valueSchemaSubject = topic + "-value";
        ArrayList<KV> inputs = new ArrayList<KV>();
        for (int i = 0; i < numElements; ++i) {
            inputs.add(KV.of((Object)new AvroGeneratedUser("KeyName" + i, Integer.valueOf(i), "color" + i), (Object)new AvroGeneratedUser("ValueName" + i, Integer.valueOf(i), "color" + i)));
        }
        KafkaIO.Read reader = KafkaIO.read().withBootstrapServers("localhost:9092").withTopic(topic).withKeyDeserializer(ConfluentSchemaRegistryDeserializerProviderTest.mockDeserializerProvider(schemaRegistryUrl, keySchemaSubject, null)).withValueDeserializer(ConfluentSchemaRegistryDeserializerProviderTest.mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null)).withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)ImmutableList.of((Object)topic), 1, numElements, OffsetResetStrategy.EARLIEST, new KeyAvroSerializableFunction(topic, schemaRegistryUrl), new ValueAvroSerializableFunction(topic, schemaRegistryUrl))).withMaxNumRecords((long)numElements);
        PCollection input = (PCollection)this.p.apply(reader.withoutMetadata());
        PAssert.that((PCollection)input).containsInAnyOrder(inputs);
        this.p.run();
    }

    @Test
    public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() {
        int numElements = 100;
        String topic = "my_topic";
        String schemaRegistryUrl = "mock://my-scope-name";
        String valueSchemaSubject = topic + "-value";
        ArrayList<KV> inputs = new ArrayList<KV>();
        for (int i2 = 0; i2 < numElements; ++i2) {
            inputs.add(KV.of((Object)i2, (Object)new AvroGeneratedUser("ValueName" + i2, Integer.valueOf(i2), "color" + i2)));
        }
        KafkaIO.Read reader = KafkaIO.read().withBootstrapServers("localhost:9092").withTopic(topic).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(ConfluentSchemaRegistryDeserializerProviderTest.mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null)).withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)ImmutableList.of((Object)topic), 1, numElements, OffsetResetStrategy.EARLIEST, (SerializableFunction<Integer, byte[]>)(SerializableFunction & Serializable)i -> ByteBuffer.wrap(new byte[4]).putInt((int)i).array(), new ValueAvroSerializableFunction(topic, schemaRegistryUrl))).withMaxNumRecords((long)numElements);
        PCollection input = (PCollection)this.p.apply(reader.withoutMetadata());
        PAssert.that((PCollection)input).containsInAnyOrder(inputs);
        this.p.run();
    }

    @Test
    public void testDeserializationWithHeaders() throws Exception {
        int numElements = 1000;
        String topic = "my_topic";
        KafkaIO.Read reader = KafkaIO.read().withBootstrapServers("none").withTopic("my_topic").withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)ImmutableList.of((Object)topic), 10, numElements, OffsetResetStrategy.EARLIEST)).withMaxNumRecords((long)numElements).withKeyDeserializerAndCoder(IntegerDeserializerWithHeadersAssertor.class, (Coder)BigEndianIntegerCoder.of()).withValueDeserializerAndCoder(LongDeserializerWithHeadersAssertor.class, (Coder)BigEndianLongCoder.of());
        PCollection input = (PCollection)((PCollection)this.p.apply(reader.withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    @Test
    public void testUnboundedSource() {
        int numElements = 1000;
        PCollection input = (PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    @Test
    public void testUnreachableKafkaBrokers() {
        this.thrown.expect(Exception.class);
        this.thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");
        int numElements = 1000;
        PCollection input = (PCollection)((PCollection)this.p.apply(KafkaIO.read().withBootstrapServers("8.8.8.8:9092").withTopicPartitions((List)ImmutableList.of((Object)new TopicPartition("test", 0))).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"heartbeat.interval.ms", (Object)5, (Object)"session.timeout.ms", (Object)8, (Object)"fetch.max.wait.ms", (Object)8, (Object)"default.api.timeout.ms", (Object)10)).withMaxNumRecords(10L).withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    @Test
    public void testResolveDefaultApiTimeout() {
        String defaultApiTimeoutConfig = "default.api.timeout.ms";
        Assert.assertEquals((Object)Duration.millis((long)20L), (Object)KafkaUnboundedReader.resolveDefaultApiTimeout((KafkaIO.Read)KafkaIO.read().withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"default.api.timeout.ms", (Object)20, (Object)"request.timeout.ms", (Object)30))));
        Assert.assertEquals((Object)Duration.millis((long)60L), (Object)KafkaUnboundedReader.resolveDefaultApiTimeout((KafkaIO.Read)KafkaIO.read().withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"request.timeout.ms", (Object)30))));
        Assert.assertEquals((Object)Duration.millis((long)60000L), (Object)KafkaUnboundedReader.resolveDefaultApiTimeout((KafkaIO.Read)KafkaIO.read()));
    }

    @Test
    public void testUnboundedSourceWithSingleTopic() {
        int numElements = 1000;
        String topic = "my_topic";
        KafkaIO.Read reader = KafkaIO.read().withBootstrapServers("none").withTopic("my_topic").withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)ImmutableList.of((Object)topic), 10, numElements, OffsetResetStrategy.EARLIEST)).withMaxNumRecords((long)numElements).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class);
        PCollection input = (PCollection)((PCollection)this.p.apply(reader.withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceWithExplicitPartitions() {
        int numElements = 1000;
        ImmutableList topics = ImmutableList.of((Object)"test");
        KafkaIO.Read reader = KafkaIO.read().withBootstrapServers("none").withTopicPartitions((List)ImmutableList.of((Object)new TopicPartition("test", 5))).withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)topics, 10, numElements, OffsetResetStrategy.EARLIEST)).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(LongDeserializer.class).withMaxNumRecords((long)(numElements / 10));
        PCollection input = (PCollection)((PCollection)this.p.apply(reader.withoutMetadata())).apply((PTransform)Values.create());
        PAssert.that((PCollection)input).satisfies((SerializableFunction)new AssertMultipleOf(5));
        PAssert.thatSingleton((PCollection)((PCollection)input.apply(Count.globally()))).isEqualTo((Object)((long)numElements / 10L));
        this.p.run();
    }

    @Test
    public void testUnboundedSourceWithWrongTopic() {
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectCause(Matchers.instanceOf(IllegalStateException.class));
        this.thrown.expectMessage("Could not find any partitions info. Please check Kafka configuration and make sure that provided topics exist.");
        int numElements = 1000;
        KafkaIO.Read reader = KafkaIO.read().withBootstrapServers("none").withTopic("wrong_topic").withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)ImmutableList.of((Object)"my_topic"), 10, numElements, OffsetResetStrategy.EARLIEST)).withMaxNumRecords((long)numElements).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class);
        ((PCollection)this.p.apply(reader.withoutMetadata())).apply((PTransform)Values.create());
        this.p.run();
    }

    @Test
    public void testUnboundedSourceTimestamps() {
        int numElements = 1000;
        PCollection input = (PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, numElements);
        PCollection diffs = (PCollection)((PCollection)input.apply("TimestampDiff", (PTransform)ParDo.of((DoFn)new ElementValueDiff()))).apply("DistinctTimestamps", (PTransform)Distinct.create());
        PAssert.thatSingleton((PCollection)diffs).isEqualTo((Object)0L);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceLogAppendTimestamps() {
        int numElements = 1000;
        PCollection input = (PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, null).withLogAppendTime().withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, numElements);
        PCollection diffs = (PCollection)((PCollection)((PCollection)input.apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.longs()).via((SerializableFunction & Serializable)t -> LOG_APPEND_START_TIME.plus((ReadableDuration)Duration.standardSeconds((long)t)).getMillis()))).apply("TimestampDiff", (PTransform)ParDo.of((DoFn)new ElementValueDiff()))).apply("DistinctTimestamps", (PTransform)Distinct.create());
        PAssert.thatSingleton((PCollection)diffs).isEqualTo((Object)0L);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceCustomTimestamps() {
        int numElements = 1000;
        long customTimestampStartMillis = 80000L;
        PCollection input = (PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(1000, null).withTimestampPolicyFactory((TimestampPolicyFactory & Serializable)(tp, prevWatermark) -> new CustomTimestampPolicyWithLimitedDelay((SerializableFunction & Serializable)record -> new Instant(TimeUnit.SECONDS.toMillis((Long)record.getKV().getValue()) + 80000L), Duration.ZERO, prevWatermark)).withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, 1000L);
        PCollection diffs = (PCollection)((PCollection)((PCollection)input.apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.longs()).via((SerializableFunction & Serializable)t -> TimeUnit.SECONDS.toMillis((long)t) + 80000L))).apply("TimestampDiff", (PTransform)ParDo.of((DoFn)new ElementValueDiff()))).apply("DistinctTimestamps", (PTransform)Distinct.create());
        PAssert.thatSingleton((PCollection)diffs).isEqualTo((Object)0L);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceCreateTimestamps() {
        int numElements = 1000;
        long createTimestampStartMillis = 50000L;
        PCollection input = (PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(1000, null).withCreateTime(Duration.ZERO).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)TIMESTAMP_TYPE_CONFIG, (Object)"CreateTime", (Object)TIMESTAMP_START_MILLIS_CONFIG, (Object)50000L)).withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, 1000L);
        PCollection diffs = (PCollection)((PCollection)((PCollection)input.apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.longs()).via((SerializableFunction & Serializable)t -> TimeUnit.SECONDS.toMillis((long)t) + 50000L))).apply("TimestampDiff", (PTransform)ParDo.of((DoFn)new ElementValueDiff()))).apply("DistinctTimestamps", (PTransform)Distinct.create());
        PAssert.thatSingleton((PCollection)diffs).isEqualTo((Object)0L);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceWithExceptionInKafkaFetch() {
        this.thrown.expectCause(Matchers.isA(IOException.class));
        this.thrown.expectCause(ThrowableMessageMatcher.hasMessage((Matcher)Matchers.containsString((String)"Exception while reading from Kafka")));
        this.thrown.expectCause(ThrowableCauseMatcher.hasCause((Matcher)Matchers.isA(KafkaException.class)));
        this.thrown.expectCause(ThrowableCauseMatcher.hasCause((Matcher)ThrowableMessageMatcher.hasMessage((Matcher)Matchers.containsString((String)"Injected error in consumer.poll()"))));
        int numElements = 1000;
        String topic = "my_topic";
        KafkaIO.Read reader = KafkaIO.read().withBootstrapServers("none").withTopic("my_topic").withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)ImmutableList.of((Object)topic), 10, numElements, OffsetResetStrategy.EARLIEST)).withMaxNumRecords((long)(2 * numElements)).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"inject.error.at.eof", (Object)true)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class);
        PCollection input = (PCollection)((PCollection)this.p.apply(reader.withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    @Test
    @Ignore
    public void testUnboundedSourceWithoutBoundedWrapper() {
        int numElements = 1000;
        int numPartitions = 10;
        String topic = "testUnboundedSourceWithoutBoundedWrapper";
        KafkaIO.Read reader = KafkaIO.read().withBootstrapServers(topic).withTopic(topic).withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)ImmutableList.of((Object)topic), 10, 1000, OffsetResetStrategy.EARLIEST)).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(LongDeserializer.class).withTimestampPolicyFactory(new TimestampPolicyWithEndOfSource(99L));
        ((PCollection)((PCollection)this.p.apply("readFromKafka", reader.withoutMetadata())).apply((PTransform)Values.create())).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardDays((long)100L))));
        PipelineResult result = this.p.run();
        MetricName elementsRead = SourceMetrics.elementsRead().getName();
        MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.inNamespace((String)elementsRead.getNamespace())).build());
        MatcherAssert.assertThat((Object)metrics.getCounters(), (Matcher)Matchers.hasItem((Matcher)MetricResultsMatchers.attemptedMetricsResult((String)elementsRead.getNamespace(), (String)elementsRead.getName(), (String)"readFromKafka", (Object)1000L)));
    }

    @Test
    public void testUnboundedSourceSplits() throws Exception {
        int numElements = 1000;
        int numSplits = 10;
        UnboundedSource initial = KafkaIOTest.mkKafkaReadTransform(numElements, null).withKeyDeserializerAndCoder(IntegerDeserializer.class, (Coder)BigEndianIntegerCoder.of()).withValueDeserializerAndCoder(LongDeserializer.class, (Coder)BigEndianLongCoder.of()).makeSource();
        List splits = initial.split(numSplits, this.p.getOptions());
        Assert.assertEquals((String)"Expected exact splitting", (long)numSplits, (long)splits.size());
        long elementsPerSplit = numElements / numSplits;
        Assert.assertEquals((String)"Expected even splits", (long)numElements, (long)(elementsPerSplit * (long)numSplits));
        PCollectionList pcollections = PCollectionList.empty((Pipeline)this.p);
        for (int i = 0; i < splits.size(); ++i) {
            pcollections = pcollections.and((PCollection)((PCollection)((PCollection)this.p.apply("split" + i, (PTransform)Read.from((UnboundedSource)((UnboundedSource)splits.get(i))).withMaxNumRecords(elementsPerSplit))).apply("Remove Metadata " + i, (PTransform)ParDo.of(new RemoveKafkaMetadata()))).apply("collection " + i, (PTransform)Values.create()));
        }
        PCollection input = (PCollection)pcollections.apply((PTransform)Flatten.pCollections());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    private static void advanceOnce(UnboundedSource.UnboundedReader<?> reader, boolean isStarted) throws IOException {
        if (!isStarted && reader.start()) {
            return;
        }
        while (!reader.advance()) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    public void testUnboundedSourceCheckpointMark() throws Exception {
        int numElements = 85;
        UnboundedSource source = (UnboundedSource)KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).makeSource().split(1, PipelineOptionsFactory.create()).get(0);
        UnboundedSource.UnboundedReader reader = source.createReader(null, null);
        int numToSkip = 20;
        for (int i = 0; i < 20; ++i) {
            KafkaIOTest.advanceOnce(reader, i > 0);
        }
        Assert.assertEquals((long)19L, (long)((Long)((KafkaRecord)reader.getCurrent()).getKV().getValue()));
        Assert.assertEquals((long)19L, (long)reader.getCurrentTimestamp().getMillis());
        KafkaCheckpointMark mark = (KafkaCheckpointMark)CoderUtils.clone((Coder)source.getCheckpointMarkCoder(), (Object)((KafkaCheckpointMark)reader.getCheckpointMark()));
        reader = source.createReader(null, (UnboundedSource.CheckpointMark)mark);
        for (int i = 20; i < numElements; ++i) {
            KafkaIOTest.advanceOnce(reader, i > 20);
            Assert.assertEquals((long)i, (long)((Long)((KafkaRecord)reader.getCurrent()).getKV().getValue()));
            Assert.assertEquals((long)i, (long)reader.getCurrentTimestamp().getMillis());
        }
    }

    @Test
    public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Exception {
        int initialNumElements = 5;
        UnboundedSource source = (UnboundedSource)KafkaIOTest.mkKafkaReadTransform(initialNumElements, new ValueAsTimestampFn()).makeSource().split(1, PipelineOptionsFactory.create()).get(0);
        UnboundedSource.UnboundedReader reader = source.createReader(null, null);
        for (int l = 0; l < initialNumElements; ++l) {
            KafkaIOTest.advanceOnce(reader, l > 0);
        }
        KafkaCheckpointMark mark = (KafkaCheckpointMark)CoderUtils.clone((Coder)source.getCheckpointMarkCoder(), (Object)((KafkaCheckpointMark)reader.getCheckpointMark()));
        int numElements = 100;
        ImmutableList topics = ImmutableList.of((Object)"topic_a", (Object)"topic_b");
        source = (UnboundedSource)KafkaIO.read().withBootstrapServers("none").withTopics((List)topics).withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn((List<String>)topics, 10, numElements, OffsetResetStrategy.LATEST)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withMaxNumRecords((long)numElements).withTimestampFn((SerializableFunction)new ValueAsTimestampFn()).makeSource().split(1, PipelineOptionsFactory.create()).get(0);
        reader = source.createReader(null, (UnboundedSource.CheckpointMark)mark);
        ArrayList<Long> expected = new ArrayList<Long>();
        ArrayList<Long> actual = new ArrayList<Long>();
        for (long i = (long)initialNumElements; i < (long)numElements; ++i) {
            KafkaIOTest.advanceOnce(reader, i > (long)initialNumElements);
            expected.add(i);
            actual.add((Long)((KafkaRecord)reader.getCurrent()).getKV().getValue());
        }
        MatcherAssert.assertThat(actual, (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])expected.toArray()));
    }

    @Test
    public void testUnboundedSourceMetrics() {
        int numElements = 1000;
        String readStep = "readFromKafka";
        this.p.apply(readStep, KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"group.id", (Object)"test.group")).commitOffsetsInFinalize().withoutMetadata());
        PipelineResult result = this.p.run();
        String splitId = "0";
        MetricName elementsRead = SourceMetrics.elementsRead().getName();
        MetricName elementsReadBySplit = SourceMetrics.elementsReadBySplit((String)splitId).getName();
        MetricName bytesRead = SourceMetrics.bytesRead().getName();
        MetricName bytesReadBySplit = SourceMetrics.bytesReadBySplit((String)splitId).getName();
        MetricName backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit((String)splitId).getName();
        MetricName backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit((String)splitId).getName();
        MetricQueryResults metrics = result.metrics().allMetrics();
        Iterable counters = metrics.getCounters();
        MatcherAssert.assertThat((Object)counters, (Matcher)Matchers.hasItem((Matcher)MetricResultsMatchers.attemptedMetricsResult((String)elementsRead.getNamespace(), (String)elementsRead.getName(), (String)readStep, (Object)1000L)));
        MatcherAssert.assertThat((Object)counters, (Matcher)Matchers.hasItem((Matcher)MetricResultsMatchers.attemptedMetricsResult((String)elementsReadBySplit.getNamespace(), (String)elementsReadBySplit.getName(), (String)readStep, (Object)1000L)));
        MatcherAssert.assertThat((Object)counters, (Matcher)Matchers.hasItem((Matcher)MetricResultsMatchers.attemptedMetricsResult((String)bytesRead.getNamespace(), (String)bytesRead.getName(), (String)readStep, (Object)12000L)));
        MatcherAssert.assertThat((Object)counters, (Matcher)Matchers.hasItem((Matcher)MetricResultsMatchers.attemptedMetricsResult((String)bytesReadBySplit.getNamespace(), (String)bytesReadBySplit.getName(), (String)readStep, (Object)12000L)));
        MetricQueryResults backlogElementsMetrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named((String)backlogElementsOfSplit.getNamespace(), (String)backlogElementsOfSplit.getName())).build());
        MatcherAssert.assertThat((Object)backlogElementsMetrics.getGauges(), (Matcher)IsIterableWithSize.iterableWithSize((int)1));
        MetricQueryResults backlogBytesMetrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named((String)backlogBytesOfSplit.getNamespace(), (String)backlogBytesOfSplit.getName())).build());
        MatcherAssert.assertThat((Object)backlogBytesMetrics.getGauges(), (Matcher)IsIterableWithSize.iterableWithSize((int)1));
        MetricQueryResults commitsEnqueuedMetrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named((String)"KafkaIOReader", (String)"checkpointMarkCommitsEnqueued")).build());
        MatcherAssert.assertThat((Object)commitsEnqueuedMetrics.getCounters(), (Matcher)IsIterableWithSize.iterableWithSize((int)1));
        MatcherAssert.assertThat((Object)((Long)((MetricResult)commitsEnqueuedMetrics.getCounters().iterator().next()).getAttempted()), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
    }

    @Test
    public void testSink() throws Exception {
        int numElements = 1000;
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
            String topic = "test";
            ((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)KafkaIO.write().withBootstrapServers("none").withTopic(topic).withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withInputTimestamp().withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)));
            this.p.run();
            completionThread.shutdown();
            KafkaIOTest.verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false, true);
        }
    }

    @Test
    public void testValuesSink() throws Exception {
        int numElements = 1000;
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
            String topic = "test";
            ((PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)Values.create())).apply(KafkaIO.write().withBootstrapServers("none").withTopic(topic).withValueSerializer(LongSerializer.class).withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)).values());
            this.p.run();
            completionThread.shutdown();
            KafkaIOTest.verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, true, false);
        }
    }

    @Test
    public void testRecordsSink() throws Exception {
        int numElements = 1000;
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
            String topic = "test";
            ((PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)ParDo.of((DoFn)new KV2ProducerRecord(topic)))).setCoder((Coder)ProducerRecordCoder.of((Coder)VarIntCoder.of(), (Coder)VarLongCoder.of())).apply((PTransform)KafkaIO.writeRecords().withBootstrapServers("none").withTopic(topic).withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withInputTimestamp().withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)));
            this.p.run();
            completionThread.shutdown();
            KafkaIOTest.verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false, true);
        }
    }

    @Test
    public void testSinkToMultipleTopics() throws Exception {
        int numElements = 1000;
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
            String defaultTopic = "test";
            ((PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)ParDo.of((DoFn)new KV2ProducerRecord(defaultTopic, false)))).setCoder((Coder)ProducerRecordCoder.of((Coder)VarIntCoder.of(), (Coder)VarLongCoder.of())).apply((PTransform)KafkaIO.writeRecords().withBootstrapServers("none").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withInputTimestamp().withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)));
            this.p.run();
            completionThread.shutdown();
            List sent = producerWrapper.mockProducer.history();
            for (int i = 0; i < numElements; ++i) {
                ProducerRecord record = (ProducerRecord)sent.get(i);
                if (i % 2 == 0) {
                    Assert.assertEquals((Object)"test_2", (Object)record.topic());
                } else {
                    Assert.assertEquals((Object)"test_1", (Object)record.topic());
                }
                Assert.assertEquals((long)i, (long)((Integer)record.key()).intValue());
                Assert.assertEquals((long)i, (long)((Long)record.value()));
                Assert.assertEquals((long)i, (long)record.timestamp().intValue());
                Assert.assertEquals((long)0L, (long)record.headers().toArray().length);
            }
        }
    }

    @Test
    public void testKafkaWriteHeaders() throws Exception {
        int numElements = 1;
        AbstractMap.SimpleEntry<String, String> header = new AbstractMap.SimpleEntry<String, String>("header_key", "header_value");
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
            String defaultTopic = "test";
            ((PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)ParDo.of((DoFn)new KV2ProducerRecord(defaultTopic, true, System.currentTimeMillis(), header)))).setCoder((Coder)ProducerRecordCoder.of((Coder)VarIntCoder.of(), (Coder)VarLongCoder.of())).apply((PTransform)KafkaIO.writeRecords().withBootstrapServers("none").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withInputTimestamp().withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)));
            this.p.run();
            completionThread.shutdown();
            List sent = producerWrapper.mockProducer.history();
            for (int i = 0; i < numElements; ++i) {
                ProducerRecord record = (ProducerRecord)sent.get(i);
                Headers headers = record.headers();
                Assert.assertNotNull((Object)headers);
                Header[] headersArray = headers.toArray();
                Assert.assertEquals((long)1L, (long)headersArray.length);
                Assert.assertEquals((Object)header.getKey(), (Object)headersArray[0].key());
                Assert.assertEquals((Object)header.getValue(), (Object)new String(headersArray[0].value(), StandardCharsets.UTF_8));
            }
        }
    }

    @Test
    public void testSinkProducerRecordsWithCustomTS() throws Exception {
        int numElements = 1000;
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
            String defaultTopic = "test";
            Long ts = System.currentTimeMillis();
            ((PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)ParDo.of((DoFn)new KV2ProducerRecord("test", ts)))).setCoder((Coder)ProducerRecordCoder.of((Coder)VarIntCoder.of(), (Coder)VarLongCoder.of())).apply((PTransform)KafkaIO.writeRecords().withBootstrapServers("none").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)));
            this.p.run();
            completionThread.shutdown();
            List sent = producerWrapper.mockProducer.history();
            for (int i = 0; i < numElements; ++i) {
                ProducerRecord record = (ProducerRecord)sent.get(i);
                Assert.assertEquals((Object)"test", (Object)record.topic());
                Assert.assertEquals((long)i, (long)((Integer)record.key()).intValue());
                Assert.assertEquals((long)i, (long)((Long)record.value()));
                Assert.assertEquals((Object)ts, (Object)record.timestamp());
            }
        }
    }

    @Test
    public void testSinkProducerRecordsWithCustomPartition() throws Exception {
        int numElements = 1000;
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
            String defaultTopic = "test";
            Integer partition = 1;
            ((PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)ParDo.of((DoFn)new KV2ProducerRecord("test", partition)))).setCoder((Coder)ProducerRecordCoder.of((Coder)VarIntCoder.of(), (Coder)VarLongCoder.of())).apply((PTransform)KafkaIO.writeRecords().withBootstrapServers("none").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)));
            this.p.run();
            completionThread.shutdown();
            List sent = producerWrapper.mockProducer.history();
            for (int i = 0; i < numElements; ++i) {
                ProducerRecord record = (ProducerRecord)sent.get(i);
                Assert.assertEquals((Object)"test", (Object)record.topic());
                Assert.assertEquals((Object)partition, (Object)record.partition());
                Assert.assertEquals((long)i, (long)((Integer)record.key()).intValue());
                Assert.assertEquals((long)i, (long)((Long)record.value()));
            }
        }
    }

    @Test
    public void testExactlyOnceSink() {
        if (!ProducerSpEL.supportsTransactions()) {
            LOG.warn("testExactlyOnceSink() is disabled as Kafka client version does not support transactions.");
            return;
        }
        int numElements = 1000;
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
            String topic = "test";
            ((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)KafkaIO.write().withBootstrapServers("none").withTopic(topic).withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withEOS(1, "test").withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn(Lists.newArrayList((Object[])new String[]{topic}), 10, 10, OffsetResetStrategy.EARLIEST)).withPublishTimestampFunction((KafkaPublishTimestampFunction & Serializable)(e, ts) -> ts).withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)));
            this.p.run();
            completionThread.shutdown();
            KafkaIOTest.verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false, true);
        }
    }

    @Test
    public void testSinkWithSendErrors() throws Throwable {
        this.thrown.expect(InjectedErrorException.class);
        this.thrown.expectMessage("Injected Error #1");
        int numElements = 1000;
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThreadWithErrors = new ProducerSendCompletionThread(producerWrapper.mockProducer, 10, 100).start();
            String topic = "test";
            ((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply((PTransform)KafkaIO.write().withBootstrapServers("none").withTopic(topic).withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)));
            try {
                this.p.run();
            }
            catch (Pipeline.PipelineExecutionException e) {
                throw e.getCause().getCause();
            }
            finally {
                completionThreadWithErrors.shutdown();
            }
        }
    }

    @Test
    public void testUnboundedSourceStartReadTime() {
        Assume.assumeTrue((boolean)ConsumerSpEL.hasOffsetsForTimes());
        int numElements = 1000;
        int startTime = numElements / 20 / 2;
        int maxNumRecords = numElements / 2;
        PCollection input = (PCollection)((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, maxNumRecords, new ValueAsTimestampFn()).withStartReadTime(new Instant((long)startTime)).withoutMetadata())).apply((PTransform)Values.create());
        KafkaIOTest.addCountingAsserts((PCollection<Long>)input, maxNumRecords, maxNumRecords, maxNumRecords, numElements - 1);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceStartReadTimeException() {
        Assume.assumeTrue((boolean)ConsumerSpEL.hasOffsetsForTimes());
        this.noMessagesException.expect(RuntimeException.class);
        int numElements = 1000;
        int startTime = numElements / 20;
        ((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn()).withStartReadTime(new Instant((long)startTime)).withoutMetadata())).apply((PTransform)Values.create());
        this.p.run();
    }

    @Test
    public void testSourceDisplayData() {
        KafkaIO.Read<Integer, Long> read = KafkaIOTest.mkKafkaReadTransform(10, null);
        DisplayData displayData = DisplayData.from(read);
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"topics", (String)"topic_a,topic_b"));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"enable.auto.commit", (Boolean)false));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"bootstrap.servers", (String)"myServer1:9092,myServer2:9092"));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"auto.offset.reset", (String)"latest"));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"receive.buffer.bytes", (long)524288L));
    }

    @Test
    public void testSourceWithExplicitPartitionsDisplayData() {
        KafkaIO.Read read = KafkaIO.readBytes().withBootstrapServers("myServer1:9092,myServer2:9092").withTopicPartitions((List)ImmutableList.of((Object)new TopicPartition("test", 5), (Object)new TopicPartition("test", 6))).withConsumerFactoryFn((SerializableFunction)new ConsumerFactoryFn(Lists.newArrayList((Object[])new String[]{"test"}), 10, 10, OffsetResetStrategy.EARLIEST));
        DisplayData displayData = DisplayData.from((HasDisplayData)read);
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"topicPartitions", (String)"test-5,test-6"));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"enable.auto.commit", (Boolean)false));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"bootstrap.servers", (String)"myServer1:9092,myServer2:9092"));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"auto.offset.reset", (String)"latest"));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"receive.buffer.bytes", (long)524288L));
    }

    @Test
    public void testSinkDisplayData() {
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            KafkaIO.Write write = KafkaIO.write().withBootstrapServers("myServerA:9092,myServerB:9092").withTopic("myTopic").withValueSerializer(LongSerializer.class).withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)).withProducerConfigUpdates((Map)ImmutableMap.of((Object)"retry.backoff.ms", (Object)100));
            DisplayData displayData = DisplayData.from((HasDisplayData)write);
            MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"topic", (String)"myTopic"));
            MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"bootstrap.servers", (String)"myServerA:9092,myServerB:9092"));
            MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"retries", (long)3L));
            MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"retry.backoff.ms", (long)100L));
        }
    }

    @Test
    public void testSinkMetrics() throws Exception {
        int numElements = 1000;
        try (MockProducerWrapper producerWrapper = new MockProducerWrapper();){
            ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
            String topic = "test";
            ((PCollection)this.p.apply(KafkaIOTest.mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())).apply("writeToKafka", (PTransform)KafkaIO.write().withBootstrapServers("none").withTopic(topic).withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn((SerializableFunction)new ProducerFactoryFn(producerWrapper.producerKey)));
            PipelineResult result = this.p.run();
            MetricName elementsWritten = SinkMetrics.elementsWritten().getName();
            MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.inNamespace((String)elementsWritten.getNamespace())).build());
            MatcherAssert.assertThat((Object)metrics.getCounters(), (Matcher)Matchers.hasItem((Matcher)MetricResultsMatchers.attemptedMetricsResult((String)elementsWritten.getNamespace(), (String)elementsWritten.getName(), (String)"writeToKafka", (Object)1000L)));
            completionThread.shutdown();
        }
    }

    private static void verifyProducerRecords(MockProducer<Integer, Long> mockProducer, String topic, int numElements, boolean keyIsAbsent, boolean verifyTimestamp) {
        List sent = mockProducer.history();
        sent.sort(Comparator.comparingLong(ProducerRecord::value));
        for (int i = 0; i < numElements; ++i) {
            ProducerRecord record = (ProducerRecord)sent.get(i);
            Assert.assertEquals((Object)topic, (Object)record.topic());
            if (keyIsAbsent) {
                Assert.assertNull((Object)record.key());
            } else {
                Assert.assertEquals((long)i, (long)((Integer)record.key()).intValue());
            }
            Assert.assertEquals((long)i, (long)((Long)record.value()));
            if (!verifyTimestamp) continue;
            Assert.assertEquals((long)i, (long)record.timestamp().intValue());
        }
    }

    private static class ValueAvroSerializableFunction
    extends BaseAvroSerializableFunction {
        ValueAvroSerializableFunction(String topic, String schemaRegistryUrl) {
            super(topic, schemaRegistryUrl, false);
        }

        public byte[] apply(Integer i) {
            return ValueAvroSerializableFunction.getSerializer(this.isKey, this.schemaRegistryUrl).serialize(this.topic, (Object)new AvroGeneratedUser("ValueName" + i, i, "color" + i));
        }
    }

    private static class KeyAvroSerializableFunction
    extends BaseAvroSerializableFunction {
        KeyAvroSerializableFunction(String topic, String schemaRegistryUrl) {
            super(topic, schemaRegistryUrl, true);
        }

        public byte[] apply(Integer i) {
            return KeyAvroSerializableFunction.getSerializer(this.isKey, this.schemaRegistryUrl).serialize(this.topic, (Object)new AvroGeneratedUser("KeyName" + i, i, "color" + i));
        }
    }

    private static abstract class BaseAvroSerializableFunction
    implements SerializableFunction<Integer, byte[]> {
        static transient Serializer<AvroGeneratedUser> serializer = null;
        final String topic;
        final String schemaRegistryUrl;
        final boolean isKey;

        BaseAvroSerializableFunction(String topic, String schemaRegistryUrl, boolean isKey) {
            this.topic = topic;
            this.schemaRegistryUrl = schemaRegistryUrl;
            this.isKey = isKey;
        }

        static Serializer<AvroGeneratedUser> getSerializer(boolean isKey, String schemaRegistryUrl) {
            if (serializer == null) {
                SchemaRegistryClient mockRegistryClient = MockSchemaRegistry.getClientForScope((String)schemaRegistryUrl);
                HashMap<String, Object> map = new HashMap<String, Object>();
                map.put("auto.register.schemas", true);
                map.put("schema.registry.url", schemaRegistryUrl);
                serializer = new KafkaAvroSerializer(mockRegistryClient);
                serializer.configure(map, isKey);
            }
            return serializer;
        }
    }

    private static class ProducerSendCompletionThread {
        private final MockProducer<Integer, Long> mockProducer;
        private final int maxErrors;
        private final int errorFrequency;
        private final AtomicBoolean done = new AtomicBoolean(false);
        private final ExecutorService injectorThread;
        private int numCompletions = 0;

        ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer) {
            this(mockProducer, 0, 0);
        }

        ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer, int maxErrors, int errorFrequency) {
            this.mockProducer = mockProducer;
            this.maxErrors = maxErrors;
            this.errorFrequency = errorFrequency;
            this.injectorThread = Executors.newSingleThreadExecutor();
        }

        ProducerSendCompletionThread start() {
            this.injectorThread.submit(() -> {
                int errorsInjected = 0;
                while (!this.done.get()) {
                    boolean successful;
                    if (errorsInjected < this.maxErrors && (this.numCompletions + 1) % this.errorFrequency == 0) {
                        successful = this.mockProducer.errorNext((RuntimeException)new InjectedErrorException("Injected Error #" + (errorsInjected + 1)));
                        if (successful) {
                            ++errorsInjected;
                        }
                    } else {
                        successful = this.mockProducer.completeNext();
                    }
                    if (successful) {
                        ++this.numCompletions;
                        continue;
                    }
                    try {
                        Thread.sleep(1L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            });
            return this;
        }

        void shutdown() {
            this.done.set(true);
            this.injectorThread.shutdown();
            try {
                Assert.assertTrue((boolean)this.injectorThread.awaitTermination(10L, TimeUnit.SECONDS));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    private static class InjectedErrorException
    extends RuntimeException {
        InjectedErrorException(String message) {
            super(message);
        }
    }

    private static class ProducerFactoryFn
    implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> {
        final String producerKey;

        ProducerFactoryFn(String producerKey) {
            this.producerKey = producerKey;
        }

        public Producer<Integer, Long> apply(Map<String, Object> config) {
            ((Serializer)Utils.newInstance(((Class)config.get("key.serializer")).asSubclass(Serializer.class))).configure(config, true);
            ((Serializer)Utils.newInstance(((Class)config.get("value.serializer")).asSubclass(Serializer.class))).configure(config, false);
            return (Producer)MOCK_PRODUCER_MAP.get(this.producerKey);
        }
    }

    private static class MockProducerWrapper
    implements AutoCloseable {
        final String producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
        final MockProducer<Integer, Long> mockProducer = new MockProducer<Integer, Long>(false, (Serializer)new IntegerSerializer(), (Serializer)new LongSerializer()){

            public synchronized void flush() {
                while (this.completeNext()) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        };
        private static Method closedMethod;

        MockProducerWrapper() {
            Assert.assertNull(MOCK_PRODUCER_MAP.putIfAbsent(this.producerKey, this.mockProducer));
        }

        @Override
        public void close() {
            MOCK_PRODUCER_MAP.remove(this.producerKey);
            try {
                if (closedMethod == null || !((Boolean)closedMethod.invoke(this.mockProducer, new Object[0])).booleanValue()) {
                    this.mockProducer.close();
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        static {
            try {
                closedMethod = MockProducer.class.getMethod("closed", new Class[0]);
            }
            catch (NoSuchMethodException e) {
                closedMethod = null;
            }
        }
    }

    private static class KV2ProducerRecord
    extends DoFn<KV<Integer, Long>, ProducerRecord<Integer, Long>> {
        final String topic;
        final Integer partition;
        final boolean isSingleTopic;
        final Long ts;
        final AbstractMap.SimpleEntry<String, String> header;

        KV2ProducerRecord(String topic) {
            this(topic, true);
        }

        KV2ProducerRecord(String topic, Integer partition) {
            this(topic, true, null, null, partition);
        }

        KV2ProducerRecord(String topic, Long ts) {
            this(topic, true, ts);
        }

        KV2ProducerRecord(String topic, boolean isSingleTopic) {
            this(topic, isSingleTopic, null);
        }

        KV2ProducerRecord(String topic, boolean isSingleTopic, Long ts) {
            this(topic, isSingleTopic, ts, null, null);
        }

        KV2ProducerRecord(String topic, boolean isSingleTopic, Long ts, AbstractMap.SimpleEntry<String, String> header) {
            this(topic, isSingleTopic, ts, header, null);
        }

        KV2ProducerRecord(String topic, boolean isSingleTopic, Long ts, AbstractMap.SimpleEntry<String, String> header, Integer partition) {
            this.topic = topic;
            this.partition = partition;
            this.isSingleTopic = isSingleTopic;
            this.ts = ts;
            this.header = header;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx) {
            KV kv = (KV)ctx.element();
            List<Header> headers = null;
            if (this.header != null) {
                headers = Arrays.asList(new RecordHeader(this.header.getKey(), this.header.getValue().getBytes(StandardCharsets.UTF_8)));
            }
            if (this.isSingleTopic) {
                ctx.output((Object)new ProducerRecord(this.topic, this.partition, this.ts, (Object)((Integer)kv.getKey()), (Object)((Long)kv.getValue()), headers));
            } else if ((Integer)kv.getKey() % 2 == 0) {
                ctx.output((Object)new ProducerRecord(this.topic + "_2", this.partition, this.ts, (Object)((Integer)kv.getKey()), (Object)((Long)kv.getValue()), headers));
            } else {
                ctx.output((Object)new ProducerRecord(this.topic + "_1", this.partition, this.ts, (Object)((Integer)kv.getKey()), (Object)((Long)kv.getValue()), headers));
            }
        }
    }

    private static class ValueAsTimestampFn
    implements SerializableFunction<KV<Integer, Long>, Instant> {
        private ValueAsTimestampFn() {
        }

        public Instant apply(KV<Integer, Long> input) {
            return new Instant(input.getValue());
        }
    }

    private static class RemoveKafkaMetadata<K, V>
    extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
        private RemoveKafkaMetadata() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx) {
            ctx.output((Object)((KafkaRecord)ctx.element()).getKV());
        }
    }

    static class TimestampPolicyWithEndOfSource<K, V>
    implements TimestampPolicyFactory<K, V> {
        private final long maxOffset;

        TimestampPolicyWithEndOfSource(long maxOffset) {
            this.maxOffset = maxOffset;
        }

        public TimestampPolicy<K, V> createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) {
            return new TimestampPolicy<K, V>(){
                long lastOffset = 0L;
                Instant lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;

                public Instant getTimestampForRecord(TimestampPolicy.PartitionContext ctx, KafkaRecord<K, V> record) {
                    this.lastOffset = record.getOffset();
                    this.lastTimestamp = new Instant(record.getTimestamp());
                    return this.lastTimestamp;
                }

                public Instant getWatermark(TimestampPolicy.PartitionContext ctx) {
                    if (this.lastOffset < maxOffset) {
                        return this.lastTimestamp;
                    }
                    return BoundedWindow.TIMESTAMP_MAX_VALUE;
                }
            };
        }
    }

    private static class ElementValueDiff
    extends DoFn<Long, Long> {
        private ElementValueDiff() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            c.output((Object)((Long)c.element() - c.timestamp().getMillis()));
        }
    }

    public static class LongDeserializerWithHeadersAssertor
    extends LongDeserializer
    implements Deserializer<Long> {
        public Long deserialize(String topic, byte[] data) {
            Assert.assertEquals((Object)false, (Object)ConsumerSpEL.deserializerSupportsHeaders());
            return super.deserialize(topic, data);
        }

        public Long deserialize(String topic, Headers headers, byte[] data) {
            Assert.assertEquals((Object)true, (Object)ConsumerSpEL.deserializerSupportsHeaders());
            return super.deserialize(topic, data);
        }
    }

    public static class IntegerDeserializerWithHeadersAssertor
    extends IntegerDeserializer
    implements Deserializer<Integer> {
        public Integer deserialize(String topic, byte[] data) {
            Assert.assertEquals((Object)false, (Object)ConsumerSpEL.deserializerSupportsHeaders());
            return super.deserialize(topic, data);
        }

        public Integer deserialize(String topic, Headers headers, byte[] data) {
            Assert.assertEquals((Object)true, (Object)ConsumerSpEL.deserializerSupportsHeaders());
            return super.deserialize(topic, data);
        }
    }

    private static class AssertMultipleOf
    implements SerializableFunction<Iterable<Long>, Void> {
        private final int num;

        AssertMultipleOf(int num) {
            this.num = num;
        }

        public Void apply(Iterable<Long> values) {
            for (Long v : values) {
                Assert.assertEquals((long)0L, (long)(v % (long)this.num));
            }
            return null;
        }
    }

    private static class ConsumerFactoryFn
    implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
        private final List<String> topics;
        private final int partitionsPerTopic;
        private final int numElements;
        private final OffsetResetStrategy offsetResetStrategy;
        private SerializableFunction<Integer, byte[]> keyFunction;
        private SerializableFunction<Integer, byte[]> valueFunction;

        ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements, OffsetResetStrategy offsetResetStrategy) {
            this.topics = topics;
            this.partitionsPerTopic = partitionsPerTopic;
            this.numElements = numElements;
            this.offsetResetStrategy = offsetResetStrategy;
            this.keyFunction = (SerializableFunction & Serializable)i -> ByteBuffer.wrap(new byte[4]).putInt((int)i).array();
            this.valueFunction = (SerializableFunction & Serializable)i -> ByteBuffer.wrap(new byte[8]).putLong(i.intValue()).array();
        }

        ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements, OffsetResetStrategy offsetResetStrategy, SerializableFunction<Integer, byte[]> keyFunction, SerializableFunction<Integer, byte[]> valueFunction) {
            this.topics = topics;
            this.partitionsPerTopic = partitionsPerTopic;
            this.numElements = numElements;
            this.offsetResetStrategy = offsetResetStrategy;
            this.keyFunction = keyFunction;
            this.valueFunction = valueFunction;
        }

        public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
            return KafkaIOTest.mkMockConsumer(this.topics, this.partitionsPerTopic, this.numElements, this.offsetResetStrategy, config, (SerializableFunction<Integer, byte[]>)this.keyFunction, (SerializableFunction<Integer, byte[]>)this.valueFunction);
        }
    }
}

