package org.apache.beam.sdk.io.kafka;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaMocks;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResultsMatchers;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableWithSize;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest.class */
public class KafkaIOTest {
    private static final String TIMESTAMP_START_MILLIS_CONFIG = "test.timestamp.start.millis";
    private static final String TIMESTAMP_TYPE_CONFIG = "test.timestamp.type";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIOTest.class);
    private static final Instant LOG_APPEND_START_TIME = new Instant(600000);
    private static final ConcurrentMap<String, MockProducer<Integer, Long>> MOCK_PRODUCER_MAP = new ConcurrentHashMap();

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Rule
    public ExpectedLogs unboundedReaderExpectedLogs = ExpectedLogs.none(KafkaUnboundedReader.class);

    @Rule
    public ExpectedLogs kafkaIOExpectedLogs = ExpectedLogs.none(KafkaIO.class);

    @Rule
    public ExpectedException noMessagesException = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$AssertMultipleOf.class */
    private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
        private final int num;

        AssertMultipleOf(int i) {
            this.num = i;
        }

        public Void apply(Iterable<Long> iterable) {
            Iterator<Long> it = iterable.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(0L, it.next().longValue() % this.num);
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$BaseAvroSerializableFunction.class */
    public static abstract class BaseAvroSerializableFunction implements SerializableFunction<Integer, byte[]> {
        static transient Serializer<AvroGeneratedUser> serializer = null;
        final String topic;
        final String schemaRegistryUrl;
        final boolean isKey;

        BaseAvroSerializableFunction(String str, String str2, boolean z) {
            this.topic = str;
            this.schemaRegistryUrl = str2;
            this.isKey = z;
        }

        static Serializer<AvroGeneratedUser> getSerializer(boolean z, String str) {
            if (serializer == null) {
                SchemaRegistryClient clientForScope = MockSchemaRegistry.getClientForScope(str);
                HashMap hashMap = new HashMap();
                hashMap.put("auto.register.schemas", true);
                hashMap.put("schema.registry.url", str);
                serializer = new KafkaAvroSerializer(clientForScope);
                serializer.configure(hashMap, z);
            }
            return serializer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ConsumerFactoryFn.class */
    public static class ConsumerFactoryFn implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
        private final List<String> topics;
        private final int partitionsPerTopic;
        private final int numElements;
        private final OffsetResetStrategy offsetResetStrategy;
        private SerializableFunction<Integer, byte[]> keyFunction;
        private SerializableFunction<Integer, byte[]> valueFunction;

        ConsumerFactoryFn(List<String> list, int i, int i2, OffsetResetStrategy offsetResetStrategy) {
            this.topics = list;
            this.partitionsPerTopic = i;
            this.numElements = i2;
            this.offsetResetStrategy = offsetResetStrategy;
            this.keyFunction = num -> {
                return ByteBuffer.wrap(new byte[4]).putInt(num.intValue()).array();
            };
            this.valueFunction = num2 -> {
                return ByteBuffer.wrap(new byte[8]).putLong(num2.intValue()).array();
            };
        }

        ConsumerFactoryFn(List<String> list, int i, int i2, OffsetResetStrategy offsetResetStrategy, SerializableFunction<Integer, byte[]> serializableFunction, SerializableFunction<Integer, byte[]> serializableFunction2) {
            this.topics = list;
            this.partitionsPerTopic = i;
            this.numElements = i2;
            this.offsetResetStrategy = offsetResetStrategy;
            this.keyFunction = serializableFunction;
            this.valueFunction = serializableFunction2;
        }

        public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
            return KafkaIOTest.mkMockConsumer(this.topics, this.partitionsPerTopic, this.numElements, this.offsetResetStrategy, map, this.keyFunction, this.valueFunction);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -458405635:
                    if (implMethodName.equals("lambda$new$5e466ba2$1")) {
                        z = false;
                        break;
                    }
                    break;
                case -458405634:
                    if (implMethodName.equals("lambda$new$5e466ba2$2")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOTest$ConsumerFactoryFn") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)[B")) {
                        return num -> {
                            return ByteBuffer.wrap(new byte[4]).putInt(num.intValue()).array();
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOTest$ConsumerFactoryFn") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)[B")) {
                        return num2 -> {
                            return ByteBuffer.wrap(new byte[8]).putLong(num2.intValue()).array();
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ElementValueDiff.class */
    private static class ElementValueDiff extends DoFn<Long, Long> {
        private ElementValueDiff() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, Long>.ProcessContext processContext) throws Exception {
            processContext.output(Long.valueOf(((Long) processContext.element()).longValue() - processContext.timestamp().getMillis()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$InjectedErrorException.class */
    private static class InjectedErrorException extends RuntimeException {
        InjectedErrorException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$IntegerDeserializerWithHeadersAssertor.class */
    public static class IntegerDeserializerWithHeadersAssertor extends IntegerDeserializer implements Deserializer<Integer> {
        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Integer m5deserialize(String str, byte[] bArr) {
            Assert.assertEquals(false, Boolean.valueOf(ConsumerSpEL.deserializerSupportsHeaders()));
            return super.deserialize(str, bArr);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Integer m4deserialize(String str, Headers headers, byte[] bArr) {
            Assert.assertEquals(true, Boolean.valueOf(ConsumerSpEL.deserializerSupportsHeaders()));
            return super.deserialize(str, bArr);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$KV2ProducerRecord.class */
    private static class KV2ProducerRecord extends DoFn<KV<Integer, Long>, ProducerRecord<Integer, Long>> {
        final String topic;
        final Integer partition;
        final boolean isSingleTopic;
        final Long ts;
        final AbstractMap.SimpleEntry<String, String> header;

        KV2ProducerRecord(String str) {
            this(str, true);
        }

        KV2ProducerRecord(String str, Integer num) {
            this(str, true, null, null, num);
        }

        KV2ProducerRecord(String str, Long l) {
            this(str, true, l);
        }

        KV2ProducerRecord(String str, boolean z) {
            this(str, z, null);
        }

        KV2ProducerRecord(String str, boolean z, Long l) {
            this(str, z, l, null, null);
        }

        KV2ProducerRecord(String str, boolean z, Long l, AbstractMap.SimpleEntry<String, String> simpleEntry) {
            this(str, z, l, simpleEntry, null);
        }

        KV2ProducerRecord(String str, boolean z, Long l, AbstractMap.SimpleEntry<String, String> simpleEntry, Integer num) {
            this.topic = str;
            this.partition = num;
            this.isSingleTopic = z;
            this.ts = l;
            this.header = simpleEntry;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<Integer, Long>, ProducerRecord<Integer, Long>>.ProcessContext processContext) {
            KV kv = (KV) processContext.element();
            List list = null;
            if (this.header != null) {
                list = Arrays.asList(new RecordHeader(this.header.getKey(), this.header.getValue().getBytes(StandardCharsets.UTF_8)));
            }
            if (this.isSingleTopic) {
                processContext.output(new ProducerRecord(this.topic, this.partition, this.ts, (Integer) kv.getKey(), (Long) kv.getValue(), list));
            } else if (((Integer) kv.getKey()).intValue() % 2 == 0) {
                processContext.output(new ProducerRecord(this.topic + "_2", this.partition, this.ts, (Integer) kv.getKey(), (Long) kv.getValue(), list));
            } else {
                processContext.output(new ProducerRecord(this.topic + "_1", this.partition, this.ts, (Integer) kv.getKey(), (Long) kv.getValue(), list));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$KeyAvroSerializableFunction.class */
    private static class KeyAvroSerializableFunction extends BaseAvroSerializableFunction {
        KeyAvroSerializableFunction(String str, String str2) {
            super(str, str2, true);
        }

        public byte[] apply(Integer num) {
            return getSerializer(this.isKey, this.schemaRegistryUrl).serialize(this.topic, new AvroGeneratedUser("KeyName" + num, num, "color" + num));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$LongDeserializerWithHeadersAssertor.class */
    public static class LongDeserializerWithHeadersAssertor extends LongDeserializer implements Deserializer<Long> {
        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Long m7deserialize(String str, byte[] bArr) {
            Assert.assertEquals(false, Boolean.valueOf(ConsumerSpEL.deserializerSupportsHeaders()));
            return super.deserialize(str, bArr);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Long m6deserialize(String str, Headers headers, byte[] bArr) {
            Assert.assertEquals(true, Boolean.valueOf(ConsumerSpEL.deserializerSupportsHeaders()));
            return super.deserialize(str, bArr);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$MockProducerWrapper.class */
    private static class MockProducerWrapper implements AutoCloseable {
        final String producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
        final MockProducer<Integer, Long> mockProducer = new MockProducer<Integer, Long>(false, new IntegerSerializer(), new LongSerializer()) { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.MockProducerWrapper.1
            public synchronized void flush() {
                while (completeNext()) {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                    }
                }
            }
        };
        private static Method closedMethod;

        MockProducerWrapper() {
            Assert.assertNull(KafkaIOTest.MOCK_PRODUCER_MAP.putIfAbsent(this.producerKey, this.mockProducer));
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            KafkaIOTest.MOCK_PRODUCER_MAP.remove(this.producerKey);
            try {
                if (closedMethod == null || !((Boolean) closedMethod.invoke(this.mockProducer, new Object[0])).booleanValue()) {
                    this.mockProducer.close();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        static {
            try {
                closedMethod = MockProducer.class.getMethod("closed", new Class[0]);
            } catch (NoSuchMethodException e) {
                closedMethod = null;
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ProducerFactoryFn.class */
    private static class ProducerFactoryFn implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> {
        final String producerKey;

        ProducerFactoryFn(String str) {
            this.producerKey = str;
        }

        public Producer<Integer, Long> apply(Map<String, Object> map) {
            ((Serializer) Utils.newInstance(((Class) map.get("key.serializer")).asSubclass(Serializer.class))).configure(map, true);
            ((Serializer) Utils.newInstance(((Class) map.get("value.serializer")).asSubclass(Serializer.class))).configure(map, false);
            return (Producer) KafkaIOTest.MOCK_PRODUCER_MAP.get(this.producerKey);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ProducerSendCompletionThread.class */
    private static class ProducerSendCompletionThread {
        private final MockProducer<Integer, Long> mockProducer;
        private final int maxErrors;
        private final int errorFrequency;
        private final AtomicBoolean done;
        private final ExecutorService injectorThread;
        private int numCompletions;

        ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer) {
            this(mockProducer, 0, 0);
        }

        ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer, int i, int i2) {
            this.done = new AtomicBoolean(false);
            this.numCompletions = 0;
            this.mockProducer = mockProducer;
            this.maxErrors = i;
            this.errorFrequency = i2;
            this.injectorThread = Executors.newSingleThreadExecutor();
        }

        ProducerSendCompletionThread start() {
            this.injectorThread.submit(() -> {
                boolean completeNext;
                int i = 0;
                while (!this.done.get()) {
                    if (i >= this.maxErrors || (this.numCompletions + 1) % this.errorFrequency != 0) {
                        completeNext = this.mockProducer.completeNext();
                    } else {
                        completeNext = this.mockProducer.errorNext(new InjectedErrorException("Injected Error #" + (i + 1)));
                        if (completeNext) {
                            i++;
                        }
                    }
                    if (completeNext) {
                        this.numCompletions++;
                    } else {
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    }
                }
            });
            return this;
        }

        void shutdown() {
            this.done.set(true);
            this.injectorThread.shutdown();
            try {
                Assert.assertTrue(this.injectorThread.awaitTermination(10L, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$RemoveKafkaMetadata.class */
    private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
        private RemoveKafkaMetadata() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KafkaRecord<K, V>, KV<K, V>>.ProcessContext processContext) {
            processContext.output(((KafkaRecord) processContext.element()).getKV());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$TimestampPolicyWithEndOfSource.class */
    static class TimestampPolicyWithEndOfSource<K, V> implements TimestampPolicyFactory<K, V> {
        private final long maxOffset;

        TimestampPolicyWithEndOfSource(long j) {
            this.maxOffset = j;
        }

        public TimestampPolicy<K, V> createTimestampPolicy(TopicPartition topicPartition, Optional<Instant> optional) {
            return new TimestampPolicy<K, V>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.TimestampPolicyWithEndOfSource.1
                long lastOffset = 0;
                Instant lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;

                public Instant getTimestampForRecord(TimestampPolicy.PartitionContext partitionContext, KafkaRecord<K, V> kafkaRecord) {
                    this.lastOffset = kafkaRecord.getOffset();
                    this.lastTimestamp = new Instant(kafkaRecord.getTimestamp());
                    return this.lastTimestamp;
                }

                public Instant getWatermark(TimestampPolicy.PartitionContext partitionContext) {
                    return this.lastOffset < TimestampPolicyWithEndOfSource.this.maxOffset ? this.lastTimestamp : BoundedWindow.TIMESTAMP_MAX_VALUE;
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ValueAsTimestampFn.class */
    static class ValueAsTimestampFn implements SerializableFunction<KV<Integer, Long>, Instant> {
        public Instant apply(KV<Integer, Long> kv) {
            return new Instant(kv.getValue());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOTest$ValueAvroSerializableFunction.class */
    private static class ValueAvroSerializableFunction extends BaseAvroSerializableFunction {
        ValueAvroSerializableFunction(String str, String str2) {
            super(str, str2, false);
        }

        public byte[] apply(Integer num) {
            return getSerializer(this.isKey, this.schemaRegistryUrl).serialize(this.topic, new AvroGeneratedUser("ValueName" + num, num, "color" + num));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MockConsumer<byte[], byte[]> mkMockConsumer(List<String> list, int i, int i2, OffsetResetStrategy offsetResetStrategy, final Map<String, Object> map, SerializableFunction<Integer, byte[]> serializableFunction, SerializableFunction<Integer, byte[]> serializableFunction2) {
        final ArrayList arrayList = new ArrayList();
        final HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (String str : list) {
            ArrayList arrayList2 = new ArrayList(i);
            for (int i3 = 0; i3 < i; i3++) {
                TopicPartition topicPartition = new TopicPartition(str, i3);
                arrayList.add(topicPartition);
                arrayList2.add(new PartitionInfo(str, i3, (Node) null, (Node[]) null, (Node[]) null));
                hashMap.put(topicPartition, new ArrayList());
            }
            hashMap2.put(str, arrayList2);
        }
        int size = arrayList.size();
        final long[] jArr = new long[size];
        long longValue = ((Long) map.getOrDefault(TIMESTAMP_START_MILLIS_CONFIG, Long.valueOf(LOG_APPEND_START_TIME.getMillis()))).longValue();
        TimestampType forName = TimestampType.forName((String) map.getOrDefault(TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString()));
        for (int i4 = 0; i4 < i2; i4++) {
            int i5 = i4 % size;
            TopicPartition topicPartition2 = (TopicPartition) arrayList.get(i5);
            byte[] bArr = (byte[]) serializableFunction.apply(Integer.valueOf(i4));
            byte[] bArr2 = (byte[]) serializableFunction2.apply(Integer.valueOf(i4));
            List list2 = (List) hashMap.get(topicPartition2);
            String str2 = topicPartition2.topic();
            int partition = topicPartition2.partition();
            long j = jArr[i5];
            jArr[i5] = j + 1;
            list2.add(new ConsumerRecord(str2, partition, j, longValue + Duration.standardSeconds(i4).getMillis(), forName, 0L, bArr.length, bArr2.length, bArr, bArr2));
        }
        final AtomicReference atomicReference = new AtomicReference(Collections.emptyList());
        final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(offsetResetStrategy) { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.1
            public synchronized void assign(Collection<TopicPartition> collection) {
                super.assign(collection);
                atomicReference.set(ImmutableList.copyOf(collection));
                for (TopicPartition topicPartition3 : collection) {
                    updateBeginningOffsets(ImmutableMap.of(topicPartition3, 0L));
                    updateEndOffsets(ImmutableMap.of(topicPartition3, Long.valueOf(((List) hashMap.get(topicPartition3)).size())));
                }
            }

            public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map2) {
                Stream<Map.Entry<TopicPartition, Long>> stream = map2.entrySet().stream();
                long[] jArr2 = jArr;
                List list3 = arrayList;
                return (Map) stream.map(entry -> {
                    long j2 = jArr2[list3.indexOf(entry.getKey())];
                    long longValue2 = ((Long) entry.getValue()).longValue();
                    return new AbstractMap.SimpleEntry((TopicPartition) entry.getKey(), longValue2 >= j2 ? null : new OffsetAndTimestamp(longValue2, longValue2));
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
            }
        };
        for (String str3 : list) {
            mockConsumer.updatePartitions(str3, (List) hashMap2.get(str3));
        }
        mockConsumer.schedulePollTask(new Runnable() { // from class: org.apache.beam.sdk.io.kafka.KafkaIOTest.2
            @Override // java.lang.Runnable
            public void run() {
                int i6 = 0;
                for (TopicPartition topicPartition3 : (List) atomicReference.get()) {
                    long position = mockConsumer.position(topicPartition3);
                    for (ConsumerRecord consumerRecord : (List) hashMap.get(topicPartition3)) {
                        if (consumerRecord.offset() >= position) {
                            mockConsumer.addRecord(consumerRecord);
                            i6++;
                        }
                    }
                }
                if (i6 == 0) {
                    if (map.get("inject.error.at.eof") != null) {
                        mockConsumer.setException(new KafkaException("Injected error in consumer.poll()"));
                    }
                    Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
                }
                mockConsumer.schedulePollTask(this);
            }
        });
        return mockConsumer;
    }

    static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(int i, SerializableFunction<KV<Integer, Long>, Instant> serializableFunction) {
        return mkKafkaReadTransform(i, Integer.valueOf(i), serializableFunction);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(int i, Integer num, SerializableFunction<KV<Integer, Long>, Instant> serializableFunction) {
        ImmutableList of = ImmutableList.of("topic_a", "topic_b");
        KafkaIO.Read<Integer, Long> withValueDeserializer = KafkaIO.read().withBootstrapServers("myServer1:9092,myServer2:9092").withTopics(of).withConsumerFactoryFn(new ConsumerFactoryFn(of, 10, i, OffsetResetStrategy.EARLIEST)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class);
        if (num != null) {
            withValueDeserializer = withValueDeserializer.withMaxNumRecords(num.intValue());
        }
        return serializableFunction != null ? withValueDeserializer.withTimestampFn(serializableFunction) : withValueDeserializer;
    }

    public static void addCountingAsserts(PCollection<Long> pCollection, long j) {
        addCountingAsserts(pCollection, j, j, 0L, j - 1);
    }

    public static void addCountingAsserts(PCollection<Long> pCollection, long j, long j2, long j3, long j4) {
        PAssert.thatSingleton(pCollection.apply("Count", Count.globally())).isEqualTo(Long.valueOf(j));
        PAssert.thatSingleton(pCollection.apply(Distinct.create()).apply("UniqueCount", Count.globally())).isEqualTo(Long.valueOf(j2));
        PAssert.thatSingleton(pCollection.apply("Min", Min.globally())).isEqualTo(Long.valueOf(j3));
        PAssert.thatSingleton(pCollection.apply("Max", Max.globally())).isEqualTo(Long.valueOf(j4));
    }

    @Test
    public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() {
        String str = "my_topic-key";
        String str2 = "my_topic-value";
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(KV.of(new AvroGeneratedUser("KeyName" + i, Integer.valueOf(i), "color" + i), new AvroGeneratedUser("ValueName" + i, Integer.valueOf(i), "color" + i)));
        }
        PAssert.that(this.p.apply(KafkaIO.read().withBootstrapServers("localhost:9092").withTopic("my_topic").withKeyDeserializer(ConfluentSchemaRegistryDeserializerProviderTest.mockDeserializerProvider("mock://my-scope-name", str, null)).withValueDeserializer(ConfluentSchemaRegistryDeserializerProviderTest.mockDeserializerProvider("mock://my-scope-name", str2, null)).withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("my_topic"), 1, 100, OffsetResetStrategy.EARLIEST, new KeyAvroSerializableFunction("my_topic", "mock://my-scope-name"), new ValueAvroSerializableFunction("my_topic", "mock://my-scope-name"))).withMaxNumRecords(100).withoutMetadata())).containsInAnyOrder(arrayList);
        this.p.run();
    }

    @Test
    public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() {
        String str = "my_topic-value";
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(KV.of(Integer.valueOf(i), new AvroGeneratedUser("ValueName" + i, Integer.valueOf(i), "color" + i)));
        }
        PAssert.that(this.p.apply(KafkaIO.read().withBootstrapServers("localhost:9092").withTopic("my_topic").withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(ConfluentSchemaRegistryDeserializerProviderTest.mockDeserializerProvider("mock://my-scope-name", str, null)).withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("my_topic"), 1, 100, OffsetResetStrategy.EARLIEST, num -> {
            return ByteBuffer.wrap(new byte[4]).putInt(num.intValue()).array();
        }, new ValueAvroSerializableFunction("my_topic", "mock://my-scope-name"))).withMaxNumRecords(100).withoutMetadata())).containsInAnyOrder(arrayList);
        this.p.run();
    }

    @Test
    public void testDeserializationWithHeaders() throws Exception {
        addCountingAsserts(this.p.apply(KafkaIO.read().withBootstrapServers("none").withTopic("my_topic").withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("my_topic"), 10, 1000, OffsetResetStrategy.EARLIEST)).withMaxNumRecords(1000).withKeyDeserializerAndCoder(IntegerDeserializerWithHeadersAssertor.class, BigEndianIntegerCoder.of()).withValueDeserializerAndCoder(LongDeserializerWithHeadersAssertor.class, BigEndianLongCoder.of()).withoutMetadata()).apply(Values.create()), 1000);
        this.p.run();
    }

    @Test
    public void testUnboundedSource() {
        addCountingAsserts(this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(Values.create()), 1000);
        this.p.run();
    }

    @Test
    public void testRiskyConfigurationWarnsProperly() {
        this.p.getOptions().as(KafkaIO.Read.FakeFlinkPipelineOptions.class).setCheckpointingInterval(1L);
        addCountingAsserts(this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withConsumerConfigUpdates(ImmutableMap.of("enable.auto.commit", true)).withoutMetadata()).apply(Values.create()), 1000);
        this.kafkaIOExpectedLogs.verifyWarn("When using the Flink runner with checkpointingInterval enabled");
        this.p.run();
    }

    @Test
    public void testUnreachableKafkaBrokers() {
        this.thrown.expect(Exception.class);
        this.thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");
        addCountingAsserts(this.p.apply(KafkaIO.read().withBootstrapServers("8.8.8.8:9092").withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0))).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withConsumerConfigUpdates(ImmutableMap.of("heartbeat.interval.ms", 5, "session.timeout.ms", 8, "fetch.max.wait.ms", 8, "default.api.timeout.ms", 10)).withMaxNumRecords(10L).withoutMetadata()).apply(Values.create()), 1000);
        this.p.run();
    }

    @Test
    public void testResolveDefaultApiTimeout() {
        Assert.assertEquals(Duration.millis(20L), KafkaUnboundedReader.resolveDefaultApiTimeout(KafkaIO.read().withConsumerConfigUpdates(ImmutableMap.of("default.api.timeout.ms", 20, "request.timeout.ms", 30))));
        Assert.assertEquals(Duration.millis(60L), KafkaUnboundedReader.resolveDefaultApiTimeout(KafkaIO.read().withConsumerConfigUpdates(ImmutableMap.of("request.timeout.ms", 30))));
        Assert.assertEquals(Duration.millis(60000L), KafkaUnboundedReader.resolveDefaultApiTimeout(KafkaIO.read()));
    }

    @Test
    public void testUnboundedSourceWithSingleTopic() {
        addCountingAsserts(this.p.apply(KafkaIO.read().withBootstrapServers("none").withTopic("my_topic").withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("my_topic"), 10, 1000, OffsetResetStrategy.EARLIEST)).withMaxNumRecords(1000).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withoutMetadata()).apply(Values.create()), 1000);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceWithExplicitPartitions() {
        PCollection apply = this.p.apply(KafkaIO.read().withBootstrapServers("none").withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5))).withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("test"), 10, 1000, OffsetResetStrategy.EARLIEST)).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(LongDeserializer.class).withMaxNumRecords(1000 / 10).withoutMetadata()).apply(Values.create());
        PAssert.that(apply).satisfies(new AssertMultipleOf(5));
        PAssert.thatSingleton(apply.apply(Count.globally())).isEqualTo(Long.valueOf(1000 / 10));
        this.p.run();
    }

    @Test
    public void testUnboundedSourceWithWrongTopic() {
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectCause(Matchers.instanceOf(IllegalStateException.class));
        this.thrown.expectMessage("Could not find any partitions info. Please check Kafka configuration and make sure that provided topics exist.");
        this.p.apply(KafkaIO.read().withBootstrapServers("none").withTopic("wrong_topic").withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("my_topic"), 10, 1000, OffsetResetStrategy.EARLIEST)).withMaxNumRecords(1000).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withoutMetadata()).apply(Values.create());
        this.p.run();
    }

    @Test
    public void testUnboundedSourceTimestamps() {
        PCollection apply = this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(Values.create());
        addCountingAsserts(apply, 1000);
        PAssert.thatSingleton(apply.apply("TimestampDiff", ParDo.of(new ElementValueDiff())).apply("DistinctTimestamps", Distinct.create())).isEqualTo(0L);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceLogAppendTimestamps() {
        PCollection apply = this.p.apply(mkKafkaReadTransform(1000, null).withLogAppendTime().withoutMetadata()).apply(Values.create());
        addCountingAsserts(apply, 1000);
        PAssert.thatSingleton(apply.apply(MapElements.into(TypeDescriptors.longs()).via(l -> {
            return Long.valueOf(LOG_APPEND_START_TIME.plus(Duration.standardSeconds(l.longValue())).getMillis());
        })).apply("TimestampDiff", ParDo.of(new ElementValueDiff())).apply("DistinctTimestamps", Distinct.create())).isEqualTo(0L);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceCustomTimestamps() {
        PCollection apply = this.p.apply(mkKafkaReadTransform(1000, null).withTimestampPolicyFactory((topicPartition, optional) -> {
            return new CustomTimestampPolicyWithLimitedDelay(kafkaRecord -> {
                return new Instant(TimeUnit.SECONDS.toMillis(((Long) kafkaRecord.getKV().getValue()).longValue()) + 80000);
            }, Duration.ZERO, optional);
        }).withoutMetadata()).apply(Values.create());
        addCountingAsserts(apply, 1000L);
        PAssert.thatSingleton(apply.apply(MapElements.into(TypeDescriptors.longs()).via(l -> {
            return Long.valueOf(TimeUnit.SECONDS.toMillis(l.longValue()) + 80000);
        })).apply("TimestampDiff", ParDo.of(new ElementValueDiff())).apply("DistinctTimestamps", Distinct.create())).isEqualTo(0L);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceCreateTimestamps() {
        PCollection apply = this.p.apply(mkKafkaReadTransform(1000, null).withCreateTime(Duration.ZERO).withConsumerConfigUpdates(ImmutableMap.of(TIMESTAMP_TYPE_CONFIG, "CreateTime", TIMESTAMP_START_MILLIS_CONFIG, 50000L)).withoutMetadata()).apply(Values.create());
        addCountingAsserts(apply, 1000L);
        PAssert.thatSingleton(apply.apply(MapElements.into(TypeDescriptors.longs()).via(l -> {
            return Long.valueOf(TimeUnit.SECONDS.toMillis(l.longValue()) + 50000);
        })).apply("TimestampDiff", ParDo.of(new ElementValueDiff())).apply("DistinctTimestamps", Distinct.create())).isEqualTo(0L);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceWithExceptionInKafkaFetch() {
        this.thrown.expectCause(Matchers.isA(IOException.class));
        this.thrown.expectCause(ThrowableMessageMatcher.hasMessage(Matchers.containsString("Exception while reading from Kafka")));
        this.thrown.expectCause(ThrowableCauseMatcher.hasCause(Matchers.isA(KafkaException.class)));
        this.thrown.expectCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(Matchers.containsString("Injected error in consumer.poll()"))));
        addCountingAsserts(this.p.apply(KafkaIO.read().withBootstrapServers("none").withTopic("my_topic").withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("my_topic"), 10, 1000, OffsetResetStrategy.EARLIEST)).withMaxNumRecords(2 * 1000).withConsumerConfigUpdates(ImmutableMap.of("inject.error.at.eof", true)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withoutMetadata()).apply(Values.create()), 1000);
        this.p.run();
    }

    @Test
    @Ignore
    public void testUnboundedSourceWithoutBoundedWrapper() {
        this.p.apply("readFromKafka", KafkaIO.read().withBootstrapServers("testUnboundedSourceWithoutBoundedWrapper").withTopic("testUnboundedSourceWithoutBoundedWrapper").withConsumerFactoryFn(new ConsumerFactoryFn(ImmutableList.of("testUnboundedSourceWithoutBoundedWrapper"), 10, 1000, OffsetResetStrategy.EARLIEST)).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(LongDeserializer.class).withTimestampPolicyFactory(new TimestampPolicyWithEndOfSource(99L)).withoutMetadata()).apply(Values.create()).apply(Window.into(FixedWindows.of(Duration.standardDays(100L))));
        PipelineResult run = this.p.run();
        MetricName name = SourceMetrics.elementsRead().getName();
        MatcherAssert.assertThat(run.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.inNamespace(name.getNamespace())).build()).getCounters(), Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name.getNamespace(), name.getName(), "readFromKafka", 1000L)));
    }

    @Test
    public void testUnboundedSourceSplits() throws Exception {
        List split = mkKafkaReadTransform(1000, null).withKeyDeserializerAndCoder(IntegerDeserializer.class, BigEndianIntegerCoder.of()).withValueDeserializerAndCoder(LongDeserializer.class, BigEndianLongCoder.of()).makeSource().split(10, this.p.getOptions());
        Assert.assertEquals("Expected exact splitting", 10, split.size());
        long j = 1000 / 10;
        Assert.assertEquals("Expected even splits", 1000, j * 10);
        PCollectionList empty = PCollectionList.empty(this.p);
        for (int i = 0; i < split.size(); i++) {
            empty = empty.and(this.p.apply("split" + i, Read.from((UnboundedSource) split.get(i)).withMaxNumRecords(j)).apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata())).apply("collection " + i, Values.create()));
        }
        addCountingAsserts(empty.apply(Flatten.pCollections()), 1000);
        this.p.run();
    }

    private static void advanceOnce(UnboundedSource.UnboundedReader<?> unboundedReader, boolean z) throws IOException {
        if (z || !unboundedReader.start()) {
            while (!unboundedReader.advance()) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Test
    public void testUnboundedSourceCheckpointMark() throws Exception {
        UnboundedSource unboundedSource = (UnboundedSource) mkKafkaReadTransform(85, new ValueAsTimestampFn()).makeSource().split(1, PipelineOptionsFactory.create()).get(0);
        UnboundedSource.UnboundedReader createReader = unboundedSource.createReader((PipelineOptions) null, (UnboundedSource.CheckpointMark) null);
        int i = 0;
        while (i < 20) {
            advanceOnce(createReader, i > 0);
            i++;
        }
        Assert.assertEquals(19L, ((Long) ((KafkaRecord) createReader.getCurrent()).getKV().getValue()).longValue());
        Assert.assertEquals(19L, createReader.getCurrentTimestamp().getMillis());
        UnboundedSource.UnboundedReader createReader2 = unboundedSource.createReader((PipelineOptions) null, (KafkaCheckpointMark) CoderUtils.clone(unboundedSource.getCheckpointMarkCoder(), createReader.getCheckpointMark()));
        int i2 = 20;
        while (i2 < 85) {
            advanceOnce(createReader2, i2 > 20);
            Assert.assertEquals(i2, ((Long) ((KafkaRecord) createReader2.getCurrent()).getKV().getValue()).longValue());
            Assert.assertEquals(i2, createReader2.getCurrentTimestamp().getMillis());
            i2++;
        }
    }

    @Test
    public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Exception {
        UnboundedSource unboundedSource = (UnboundedSource) mkKafkaReadTransform(5, new ValueAsTimestampFn()).makeSource().split(1, PipelineOptionsFactory.create()).get(0);
        UnboundedSource.UnboundedReader createReader = unboundedSource.createReader((PipelineOptions) null, (UnboundedSource.CheckpointMark) null);
        int i = 0;
        while (i < 5) {
            advanceOnce(createReader, i > 0);
            i++;
        }
        KafkaCheckpointMark kafkaCheckpointMark = (KafkaCheckpointMark) CoderUtils.clone(unboundedSource.getCheckpointMarkCoder(), createReader.getCheckpointMark());
        ImmutableList of = ImmutableList.of("topic_a", "topic_b");
        UnboundedSource.UnboundedReader createReader2 = ((UnboundedSource) KafkaIO.read().withBootstrapServers("none").withTopics(of).withConsumerFactoryFn(new ConsumerFactoryFn(of, 10, 100, OffsetResetStrategy.LATEST)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).withMaxNumRecords(100).withTimestampFn(new ValueAsTimestampFn()).makeSource().split(1, PipelineOptionsFactory.create()).get(0)).createReader((PipelineOptions) null, kafkaCheckpointMark);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        long j = 5;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                MatcherAssert.assertThat(arrayList2, IsIterableContainingInAnyOrder.containsInAnyOrder(arrayList.toArray()));
                return;
            }
            advanceOnce(createReader2, j2 > ((long) 5));
            arrayList.add(Long.valueOf(j2));
            arrayList2.add((Long) ((KafkaRecord) createReader2.getCurrent()).getKV().getValue());
            j = j2 + 1;
        }
    }

    @Test
    public void testUnboundedSourceMetrics() {
        this.p.apply("readFromKafka", mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withConsumerConfigUpdates(ImmutableMap.of("group.id", "test.group")).commitOffsetsInFinalize().withoutMetadata());
        PipelineResult run = this.p.run();
        MetricName name = SourceMetrics.elementsRead().getName();
        MetricName name2 = SourceMetrics.elementsReadBySplit("0").getName();
        MetricName name3 = SourceMetrics.bytesRead().getName();
        MetricName name4 = SourceMetrics.bytesReadBySplit("0").getName();
        MetricName name5 = SourceMetrics.backlogElementsOfSplit("0").getName();
        MetricName name6 = SourceMetrics.backlogBytesOfSplit("0").getName();
        Iterable counters = run.metrics().allMetrics().getCounters();
        MatcherAssert.assertThat(counters, Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name.getNamespace(), name.getName(), "readFromKafka", 1000L)));
        MatcherAssert.assertThat(counters, Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name2.getNamespace(), name2.getName(), "readFromKafka", 1000L)));
        MatcherAssert.assertThat(counters, Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name3.getNamespace(), name3.getName(), "readFromKafka", 12000L)));
        MatcherAssert.assertThat(counters, Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name4.getNamespace(), name4.getName(), "readFromKafka", 12000L)));
        MatcherAssert.assertThat(run.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(name5.getNamespace(), name5.getName())).build()).getGauges(), IsIterableWithSize.iterableWithSize(1));
        MatcherAssert.assertThat(run.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(name6.getNamespace(), name6.getName())).build()).getGauges(), IsIterableWithSize.iterableWithSize(1));
        MetricQueryResults queryMetrics = run.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named("KafkaIOReader", "checkpointMarkCommitsEnqueued")).build());
        MatcherAssert.assertThat(queryMetrics.getCounters(), IsIterableWithSize.iterableWithSize(1));
        MatcherAssert.assertThat((Long) ((MetricResult) queryMetrics.getCounters().iterator().next()).getAttempted(), Matchers.greaterThan(0L));
    }

    @Test
    public void testUnboundedReaderLogsCommitFailure() throws Exception {
        ImmutableList of = ImmutableList.of("topic_a");
        UnboundedSource.UnboundedReader createReader = KafkaIO.read().withBootstrapServers("myServer1:9092,myServer2:9092").withTopics(of).withConsumerFactoryFn(new KafkaMocks.PositionErrorConsumerFactory()).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(LongDeserializer.class).makeSource().createReader((PipelineOptions) null, (UnboundedSource.CheckpointMark) null);
        createReader.start();
        this.unboundedReaderExpectedLogs.verifyWarn("exception while fetching latest offset for partition");
        createReader.close();
    }

    @Test
    public void testSink() throws Exception {
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        try {
            ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer).start();
            this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withInputTimestamp().withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)));
            this.p.run();
            start.shutdown();
            verifyProducerRecords(mockProducerWrapper.mockProducer, "test", 1000, false, true);
            $closeResource(null, mockProducerWrapper);
        } catch (Throwable th) {
            $closeResource(null, mockProducerWrapper);
            throw th;
        }
    }

    @Test
    public void testValuesSink() throws Exception {
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        Throwable th = null;
        try {
            try {
                ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer).start();
                this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(Values.create()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)).values());
                this.p.run();
                start.shutdown();
                verifyProducerRecords(mockProducerWrapper.mockProducer, "test", 1000, true, false);
                $closeResource(null, mockProducerWrapper);
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, mockProducerWrapper);
            throw th2;
        }
    }

    @Test
    public void testRecordsSink() throws Exception {
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        Throwable th = null;
        try {
            try {
                ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer).start();
                this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(ParDo.of(new KV2ProducerRecord("test"))).setCoder(ProducerRecordCoder.of(VarIntCoder.of(), VarLongCoder.of())).apply(KafkaIO.writeRecords().withBootstrapServers("none").withTopic("test").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withInputTimestamp().withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)));
                this.p.run();
                start.shutdown();
                verifyProducerRecords(mockProducerWrapper.mockProducer, "test", 1000, false, true);
                $closeResource(null, mockProducerWrapper);
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, mockProducerWrapper);
            throw th2;
        }
    }

    @Test
    public void testSinkToMultipleTopics() throws Exception {
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        Throwable th = null;
        try {
            try {
                ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer).start();
                this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(ParDo.of(new KV2ProducerRecord("test", false))).setCoder(ProducerRecordCoder.of(VarIntCoder.of(), VarLongCoder.of())).apply(KafkaIO.writeRecords().withBootstrapServers("none").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withInputTimestamp().withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)));
                this.p.run();
                start.shutdown();
                List history = mockProducerWrapper.mockProducer.history();
                for (int i = 0; i < 1000; i++) {
                    ProducerRecord producerRecord = (ProducerRecord) history.get(i);
                    if (i % 2 == 0) {
                        Assert.assertEquals("test_2", producerRecord.topic());
                    } else {
                        Assert.assertEquals("test_1", producerRecord.topic());
                    }
                    Assert.assertEquals(i, ((Integer) producerRecord.key()).intValue());
                    Assert.assertEquals(i, ((Long) producerRecord.value()).longValue());
                    Assert.assertEquals(i, producerRecord.timestamp().intValue());
                    Assert.assertEquals(0L, producerRecord.headers().toArray().length);
                }
                $closeResource(null, mockProducerWrapper);
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, mockProducerWrapper);
            throw th2;
        }
    }

    @Test
    public void testKafkaWriteHeaders() throws Exception {
        AbstractMap.SimpleEntry simpleEntry = new AbstractMap.SimpleEntry("header_key", "header_value");
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        try {
            ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer).start();
            this.p.apply(mkKafkaReadTransform(1, new ValueAsTimestampFn()).withoutMetadata()).apply(ParDo.of(new KV2ProducerRecord("test", true, Long.valueOf(System.currentTimeMillis()), simpleEntry))).setCoder(ProducerRecordCoder.of(VarIntCoder.of(), VarLongCoder.of())).apply(KafkaIO.writeRecords().withBootstrapServers("none").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withInputTimestamp().withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)));
            this.p.run();
            start.shutdown();
            List history = mockProducerWrapper.mockProducer.history();
            for (int i = 0; i < 1; i++) {
                Headers headers = ((ProducerRecord) history.get(i)).headers();
                Assert.assertNotNull(headers);
                Header[] array = headers.toArray();
                Assert.assertEquals(1L, array.length);
                Assert.assertEquals(simpleEntry.getKey(), array[0].key());
                Assert.assertEquals(simpleEntry.getValue(), new String(array[0].value(), StandardCharsets.UTF_8));
            }
        } finally {
            $closeResource(null, mockProducerWrapper);
        }
    }

    @Test
    public void testSinkProducerRecordsWithCustomTS() throws Exception {
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        Throwable th = null;
        try {
            try {
                ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer).start();
                Long valueOf = Long.valueOf(System.currentTimeMillis());
                this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(ParDo.of(new KV2ProducerRecord("test", valueOf))).setCoder(ProducerRecordCoder.of(VarIntCoder.of(), VarLongCoder.of())).apply(KafkaIO.writeRecords().withBootstrapServers("none").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)));
                this.p.run();
                start.shutdown();
                List history = mockProducerWrapper.mockProducer.history();
                for (int i = 0; i < 1000; i++) {
                    ProducerRecord producerRecord = (ProducerRecord) history.get(i);
                    Assert.assertEquals("test", producerRecord.topic());
                    Assert.assertEquals(i, ((Integer) producerRecord.key()).intValue());
                    Assert.assertEquals(i, ((Long) producerRecord.value()).longValue());
                    Assert.assertEquals(valueOf, producerRecord.timestamp());
                }
                $closeResource(null, mockProducerWrapper);
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, mockProducerWrapper);
            throw th2;
        }
    }

    @Test
    public void testSinkProducerRecordsWithCustomPartition() throws Exception {
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        try {
            ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer).start();
            this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(ParDo.of(new KV2ProducerRecord("test", (Integer) 1))).setCoder(ProducerRecordCoder.of(VarIntCoder.of(), VarLongCoder.of())).apply(KafkaIO.writeRecords().withBootstrapServers("none").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)));
            this.p.run();
            start.shutdown();
            List history = mockProducerWrapper.mockProducer.history();
            for (int i = 0; i < 1000; i++) {
                ProducerRecord producerRecord = (ProducerRecord) history.get(i);
                Assert.assertEquals("test", producerRecord.topic());
                Assert.assertEquals(1, producerRecord.partition());
                Assert.assertEquals(i, ((Integer) producerRecord.key()).intValue());
                Assert.assertEquals(i, ((Long) producerRecord.value()).longValue());
            }
        } finally {
            $closeResource(null, mockProducerWrapper);
        }
    }

    @Test
    public void testExactlyOnceSink() {
        if (!ProducerSpEL.supportsTransactions()) {
            LOG.warn("testExactlyOnceSink() is disabled as Kafka client version does not support transactions.");
            return;
        }
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        Throwable th = null;
        try {
            try {
                ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer).start();
                this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withEOS(1, "test").withConsumerFactoryFn(new ConsumerFactoryFn(Lists.newArrayList(new String[]{"test"}), 10, 10, OffsetResetStrategy.EARLIEST)).withPublishTimestampFunction((kv, instant) -> {
                    return instant;
                }).withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)));
                this.p.run();
                start.shutdown();
                verifyProducerRecords(mockProducerWrapper.mockProducer, "test", 1000, false, true);
                $closeResource(null, mockProducerWrapper);
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, mockProducerWrapper);
            throw th2;
        }
    }

    @Test
    public void testExactlyOnceSinkWithSendException() throws Throwable {
        if (!ProducerSpEL.supportsTransactions()) {
            LOG.warn("testExactlyOnceSink() is disabled as Kafka client version does not support transactions.");
            return;
        }
        this.thrown.expect(KafkaException.class);
        this.thrown.expectMessage("fakeException");
        this.p.apply(Create.of(ImmutableList.of(KV.of(1, 1L), KV.of(2, 2L)))).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withEOS(1, "testException").withConsumerFactoryFn(new ConsumerFactoryFn(Lists.newArrayList(new String[]{"test"}), 10, 10, OffsetResetStrategy.EARLIEST)).withProducerFactoryFn(new KafkaMocks.SendErrorProducerFactory()));
        try {
            this.p.run();
        } catch (Pipeline.PipelineExecutionException e) {
            throw e.getCause();
        }
    }

    @Test
    public void testSinkWithSendErrors() throws Throwable {
        this.thrown.expect(InjectedErrorException.class);
        this.thrown.expectMessage("Injected Error #1");
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        try {
            ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer, 10, 100).start();
            this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply(KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)));
            try {
                try {
                    this.p.run();
                    start.shutdown();
                } catch (Pipeline.PipelineExecutionException e) {
                    throw e.getCause().getCause();
                }
            } catch (Throwable th) {
                start.shutdown();
                throw th;
            }
        } finally {
            $closeResource(null, mockProducerWrapper);
        }
    }

    @Test
    public void testUnboundedSourceStartReadTime() {
        Assume.assumeTrue(ConsumerSpEL.hasOffsetsForTimes());
        int i = 1000 / 2;
        addCountingAsserts(this.p.apply(mkKafkaReadTransform(1000, Integer.valueOf(i), new ValueAsTimestampFn()).withStartReadTime(new Instant((1000 / 20) / 2)).withoutMetadata()).apply(Values.create()), i, i, i, 1000 - 1);
        this.p.run();
    }

    @Test
    public void testUnboundedSourceStartReadTimeException() {
        Assume.assumeTrue(ConsumerSpEL.hasOffsetsForTimes());
        this.noMessagesException.expect(RuntimeException.class);
        this.p.apply(mkKafkaReadTransform(1000, 1000, new ValueAsTimestampFn()).withStartReadTime(new Instant(1000 / 20)).withoutMetadata()).apply(Values.create());
        this.p.run();
    }

    @Test
    public void testSourceDisplayData() {
        DisplayData from = DisplayData.from(mkKafkaReadTransform(10, null));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("topics", "topic_a,topic_b"));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("enable.auto.commit", false));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("auto.offset.reset", "latest"));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("receive.buffer.bytes", 524288L));
    }

    @Test
    public void testSourceWithExplicitPartitionsDisplayData() {
        DisplayData from = DisplayData.from(KafkaIO.readBytes().withBootstrapServers("myServer1:9092,myServer2:9092").withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 6))).withConsumerFactoryFn(new ConsumerFactoryFn(Lists.newArrayList(new String[]{"test"}), 10, 10, OffsetResetStrategy.EARLIEST)));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("topicPartitions", "test-5,test-6"));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("enable.auto.commit", false));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("auto.offset.reset", "latest"));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("receive.buffer.bytes", 524288L));
    }

    @Test
    public void testSinkDisplayData() {
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        try {
            DisplayData from = DisplayData.from(KafkaIO.write().withBootstrapServers("myServerA:9092,myServerB:9092").withTopic("myTopic").withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)).withProducerConfigUpdates(ImmutableMap.of("retry.backoff.ms", 100)));
            MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("topic", "myTopic"));
            MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
            MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("retries", 3L));
            MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("retry.backoff.ms", 100L));
            $closeResource(null, mockProducerWrapper);
        } catch (Throwable th) {
            $closeResource(null, mockProducerWrapper);
            throw th;
        }
    }

    @Test
    public void testSinkMetrics() throws Exception {
        MockProducerWrapper mockProducerWrapper = new MockProducerWrapper();
        Throwable th = null;
        try {
            try {
                ProducerSendCompletionThread start = new ProducerSendCompletionThread(mockProducerWrapper.mockProducer).start();
                this.p.apply(mkKafkaReadTransform(1000, new ValueAsTimestampFn()).withoutMetadata()).apply("writeToKafka", KafkaIO.write().withBootstrapServers("none").withTopic("test").withKeySerializer(IntegerSerializer.class).withValueSerializer(LongSerializer.class).withProducerFactoryFn(new ProducerFactoryFn(mockProducerWrapper.producerKey)));
                PipelineResult run = this.p.run();
                MetricName name = SinkMetrics.elementsWritten().getName();
                MatcherAssert.assertThat(run.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.inNamespace(name.getNamespace())).build()).getCounters(), Matchers.hasItem(MetricResultsMatchers.attemptedMetricsResult(name.getNamespace(), name.getName(), "writeToKafka", 1000L)));
                start.shutdown();
                $closeResource(null, mockProducerWrapper);
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, mockProducerWrapper);
            throw th2;
        }
    }

    private static void verifyProducerRecords(MockProducer<Integer, Long> mockProducer, String str, int i, boolean z, boolean z2) {
        List history = mockProducer.history();
        history.sort(Comparator.comparingLong((v0) -> {
            return v0.value();
        }));
        for (int i2 = 0; i2 < i; i2++) {
            ProducerRecord producerRecord = (ProducerRecord) history.get(i2);
            Assert.assertEquals(str, producerRecord.topic());
            if (z) {
                Assert.assertNull(producerRecord.key());
            } else {
                Assert.assertEquals(i2, ((Integer) producerRecord.key()).intValue());
            }
            Assert.assertEquals(i2, ((Long) producerRecord.value()).longValue());
            if (z2) {
                Assert.assertEquals(i2, producerRecord.timestamp().intValue());
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2109520322:
                if (implMethodName.equals("lambda$testExactlyOnceSink$3a211909$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1782809936:
                if (implMethodName.equals("lambda$testUnboundedSourceCustomTimestamps$512d00d3$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1149948253:
                if (implMethodName.equals("lambda$testUnboundedSourceLogAppendTimestamps$512d00d3$1")) {
                    z = true;
                    break;
                }
                break;
            case -533145083:
                if (implMethodName.equals("lambda$testUnboundedSourceCreateTimestamps$512d00d3$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1496276146:
                if (implMethodName.equals("lambda$testUnboundedSourceCustomTimestamps$43268ee4$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1698252133:
                if (implMethodName.equals("lambda$testReadAvroSpecificRecordsWithConfluentSchemaRegistry$b1ef6876$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1734684864:
                if (implMethodName.equals("lambda$testUnboundedSourceCustomTimestamps$97fa7083$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/kafka/TimestampPolicyFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("createTimestampPolicy") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/kafka/common/TopicPartition;Ljava/util/Optional;)Lorg/apache/beam/sdk/io/kafka/TimestampPolicy;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/common/TopicPartition;Ljava/util/Optional;)Lorg/apache/beam/sdk/io/kafka/TimestampPolicy;")) {
                    return (topicPartition, optional) -> {
                        return new CustomTimestampPolicyWithLimitedDelay(kafkaRecord -> {
                            return new Instant(TimeUnit.SECONDS.toMillis(((Long) kafkaRecord.getKV().getValue()).longValue()) + 80000);
                        }, Duration.ZERO, optional);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return Long.valueOf(LOG_APPEND_START_TIME.plus(Duration.standardSeconds(l.longValue())).getMillis());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)[B")) {
                    return num -> {
                        return ByteBuffer.wrap(new byte[4]).putInt(num.intValue()).array();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l2 -> {
                        return Long.valueOf(TimeUnit.SECONDS.toMillis(l2.longValue()) + 80000);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/kafka/KafkaRecord;)Lorg/joda/time/Instant;")) {
                    return kafkaRecord -> {
                        return new Instant(TimeUnit.SECONDS.toMillis(((Long) kafkaRecord.getKV().getValue()).longValue()) + 80000);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l3 -> {
                        return Long.valueOf(TimeUnit.SECONDS.toMillis(l3.longValue()) + 50000);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("getTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/joda/time/Instant;)Lorg/joda/time/Instant;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/KV;Lorg/joda/time/Instant;)Lorg/joda/time/Instant;")) {
                    return (kv, instant) -> {
                        return instant;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
