package org.apache.hudi.utilities.sources;

import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.DoubleValue;
import com.google.protobuf.FloatValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.Timestamps;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.test.proto.Nested;
import org.apache.hudi.utilities.test.proto.Sample;
import org.apache.hudi.utilities.test.proto.SampleEnum;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestProtoKafkaSource.class */
public class TestProtoKafkaSource extends BaseTestKafkaSource {
    private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();
    private static final Random RANDOM = new Random();
    private static final String MOCK_REGISTRY_URL = "mock://127.0.0.1:8081";

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    protected TypedProperties createPropsForKafkaSource(String str, Long l, String str2) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.streamer.source.kafka.topic", str);
        typedProperties.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        typedProperties.setProperty("auto.offset.reset", str2);
        typedProperties.setProperty("enable.auto.commit", "false");
        typedProperties.setProperty("hoodie.streamer.kafka.source.maxEvents", l != null ? String.valueOf(l) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
        typedProperties.setProperty("group.id", UUID.randomUUID().toString());
        typedProperties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
        return typedProperties;
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    protected SourceFormatAdapter createSource(TypedProperties typedProperties) {
        this.schemaProvider = new ProtoClassBasedSchemaProvider(typedProperties, jsc());
        return new SourceFormatAdapter(new ProtoKafkaSource(typedProperties, jsc(), spark(), this.metrics, new DefaultStreamContext(this.schemaProvider, this.sourceProfile)));
    }

    @Test
    public void testProtoKafkaSourceWithConfluentProtoDeserialization() {
        this.testUtils.createTopic("hoodie_test_testProtoKafkaSourceWithConfluentDeserializer", 2);
        TypedProperties createPropsForKafkaSource = createPropsForKafkaSource("hoodie_test_testProtoKafkaSourceWithConfluentDeserializer", null, "earliest");
        createPropsForKafkaSource.put(KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS.key(), "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
        createPropsForKafkaSource.put("schema.registry.url", MOCK_REGISTRY_URL);
        createPropsForKafkaSource.put("hoodie.streamer.schemaprovider.registry.url", MOCK_REGISTRY_URL);
        createPropsForKafkaSource.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
        createPropsForKafkaSource.remove(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
        ProtoKafkaSource protoKafkaSource = new ProtoKafkaSource(createPropsForKafkaSource, jsc(), spark(), new SchemaRegistryProvider(createPropsForKafkaSource, jsc()), this.metrics);
        List<Sample> createSampleMessages = createSampleMessages(1000);
        sendMessagesToKafkaWithConfluentSerializer("hoodie_test_testProtoKafkaSourceWithConfluentDeserializer", 2, createSampleMessages);
        Assertions.assertEquals(createSampleMessages.stream().map((v1) -> {
            return protoToJson(v1);
        }).collect(Collectors.toSet()), new HashSet(((JavaRDD) protoKafkaSource.fetchNext(Option.empty(), 1000L).getBatch().get()).map(message -> {
            return PRINTER.print(message);
        }).collect()));
    }

    @Test
    public void testProtoKafkaSourceWithFlattenWrappedPrimitives() {
        this.testUtils.createTopic("hoodie_test_testProtoKafkaSourceFlatten", 2);
        TypedProperties createPropsForKafkaSource = createPropsForKafkaSource("hoodie_test_testProtoKafkaSourceFlatten", null, "earliest");
        createPropsForKafkaSource.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new ProtoKafkaSource(createPropsForKafkaSource, jsc(), spark(), new ProtoClassBasedSchemaProvider(createPropsForKafkaSource, jsc()), this.metrics));
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        sendMessagesToKafka("hoodie_test_testProtoKafkaSourceFlatten", 1000, 2);
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        Assertions.assertEquals(900L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), 900L).getBatch().get()).count());
        sendMessagesToKafka("hoodie_test_testProtoKafkaSourceFlatten", 1000, 2);
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(1100L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        InputBatch fetchNewDataInRowFormat2 = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((Dataset) fetchNewDataInRowFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInRowFormat2.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    private static List<Sample> createSampleMessages(int i) {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            Sample.Builder primitiveBytes = Sample.newBuilder().setPrimitiveDouble(RANDOM.nextDouble()).setPrimitiveFloat(RANDOM.nextFloat()).setPrimitiveInt(RANDOM.nextInt()).setPrimitiveLong(RANDOM.nextLong()).setPrimitiveUnsignedInt(RANDOM.nextInt()).setPrimitiveUnsignedLong(RANDOM.nextLong()).setPrimitiveSignedInt(RANDOM.nextInt()).setPrimitiveSignedLong(RANDOM.nextLong()).setPrimitiveFixedInt(RANDOM.nextInt()).setPrimitiveFixedLong(RANDOM.nextLong()).setPrimitiveFixedSignedInt(RANDOM.nextInt()).setPrimitiveFixedSignedLong(RANDOM.nextLong()).setPrimitiveBoolean(RANDOM.nextBoolean()).setPrimitiveString(UUID.randomUUID().toString()).setPrimitiveBytes(ByteString.copyFrom(StringUtils.getUTF8Bytes(UUID.randomUUID().toString())));
            if (RANDOM.nextBoolean()) {
                HashMap hashMap = new HashMap();
                hashMap.put(UUID.randomUUID().toString(), Integer.valueOf(RANDOM.nextInt()));
                HashMap hashMap2 = new HashMap();
                hashMap2.put(UUID.randomUUID().toString(), generateRandomNestedMessage());
                primitiveBytes.addAllRepeatedPrimitive(Arrays.asList(Integer.valueOf(RANDOM.nextInt()), Integer.valueOf(RANDOM.nextInt()))).putAllMapPrimitive(hashMap).setNestedMessage(generateRandomNestedMessage()).addAllRepeatedMessage(Arrays.asList(generateRandomNestedMessage(), generateRandomNestedMessage())).putAllMapMessage(hashMap2).setWrappedString(StringValue.of(UUID.randomUUID().toString())).setWrappedInt(Int32Value.of(RANDOM.nextInt())).setWrappedLong(Int64Value.of(RANDOM.nextLong())).setWrappedUnsignedInt(UInt32Value.of(RANDOM.nextInt())).setWrappedUnsignedLong(UInt64Value.of(RANDOM.nextLong())).setWrappedDouble(DoubleValue.of(RANDOM.nextDouble())).setWrappedFloat(FloatValue.of(RANDOM.nextFloat())).setWrappedBoolean(BoolValue.of(RANDOM.nextBoolean())).setWrappedBytes(BytesValue.of(ByteString.copyFrom(StringUtils.getUTF8Bytes(UUID.randomUUID().toString())))).setEnum(SampleEnum.SECOND).setTimestamp(Timestamps.fromMillis(System.currentTimeMillis()));
            }
            return primitiveBytes.m219build();
        }).collect(Collectors.toList());
    }

    private static Nested generateRandomNestedMessage() {
        return Nested.newBuilder().setNestedInt(RANDOM.nextInt()).m124build();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    protected void sendMessagesToKafka(String str, int i, int i2) {
        List<Sample> createSampleMessages = createSampleMessages(i);
        KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties(false));
        Throwable th = null;
        for (int i3 = 0; i3 < createSampleMessages.size(); i3++) {
            try {
                try {
                    kafkaProducer.send(new ProducerRecord(str, Integer.toString(i3 % i2), createSampleMessages.get(i3).toByteArray()));
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th3;
            }
        }
        if (kafkaProducer != null) {
            if (0 == 0) {
                kafkaProducer.close();
                return;
            }
            try {
                kafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    private void sendMessagesToKafkaWithConfluentSerializer(String str, int i, List<Sample> list) {
        KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties(true));
        Throwable th = null;
        for (int i2 = 0; i2 < list.size(); i2++) {
            try {
                try {
                    kafkaProducer.send(new ProducerRecord(str, Integer.toString(i2 % i), list.get(i2)));
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th3;
            }
        }
        if (kafkaProducer != null) {
            if (0 == 0) {
                kafkaProducer.close();
                return;
            }
            try {
                kafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    private Properties getProducerProperties(boolean z) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.testUtils.brokerAddress());
        if (z) {
            properties.put("value.serializer", KafkaProtobufSerializer.class.getName());
            properties.put("value.deserializer", KafkaProtobufDeserializer.class.getName());
            properties.put("schema.registry.url", MOCK_REGISTRY_URL);
            properties.put("auto.register.schemas", "true");
        } else {
            properties.put("value.serializer", ByteArraySerializer.class.getName());
        }
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("acks", "all");
        return properties;
    }

    private String protoToJson(Message message) {
        try {
            return PRINTER.print(message);
        } catch (Exception e) {
            throw new RuntimeException("Failed to convert proto to json", e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 531710703:
                if (implMethodName.equals("lambda$testProtoKafkaSourceWithConfluentProtoDeserialization$b852b88$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/TestProtoKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lcom/google/protobuf/Message;)Ljava/lang/String;")) {
                    return message -> {
                        return PRINTER.print(message);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
