/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformConfiguration;
import org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class KafkaReadSchemaTransformProviderTest {
    private static final String AVRO_SCHEMA = "{\"type\":\"record\",\"namespace\":\"com.example\",\"name\":\"FullName\",\"fields\":[{\"name\":\"first\",\"type\":\"string\"},{\"name\":\"last\",\"type\":\"string\"}]}";
    private static final String PROTO_SCHEMA = "syntax = \"proto3\";\n\nmessage MyMessage {\n  int32 id = 1;\n  string name = 2;\n  bool active = 3;\n\n  // Nested field\n  message Address {\n    string street = 1;\n    string city = 2;\n    string state = 3;\n    string zip_code = 4;\n  }\n\n  Address address = 4;\n}";

    @Test
    public void testValidConfigurations() {
        Assert.assertThrows(AssertionError.class, () -> KafkaReadSchemaTransformConfiguration.builder().setFormat("UNUSUAL_FORMAT").setTopic("a_valid_topic").setBootstrapServers("a_valid_server").build().validate());
        Assert.assertThrows(IllegalStateException.class, () -> KafkaReadSchemaTransformConfiguration.builder().setFormat("UNUSUAL_FORMAT").setBootstrapServers("a_valid_server").build().validate());
        Assert.assertThrows(IllegalStateException.class, () -> KafkaReadSchemaTransformConfiguration.builder().setFormat("UNUSUAL_FORMAT").setTopic("a_valid_topic").build().validate());
    }

    @Test
    public void testFindTransformAndMakeItWork() {
        ServiceLoader<SchemaTransformProvider> serviceLoader = ServiceLoader.load(SchemaTransformProvider.class);
        List providers = StreamSupport.stream(serviceLoader.spliterator(), false).filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class).collect(Collectors.toList());
        SchemaTransformProvider kafkaProvider = (SchemaTransformProvider)providers.get(0);
        Assert.assertEquals((Object)kafkaProvider.outputCollectionNames(), (Object)Lists.newArrayList((Object[])new String[]{"output", "errors"}));
        Assert.assertEquals((Object)kafkaProvider.inputCollectionNames(), (Object)Lists.newArrayList());
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"bootstrapServers", "topic", "schema", "autoOffsetResetConfig", "consumerConfigUpdates", "format", "confluentSchemaRegistrySubject", "confluentSchemaRegistryUrl", "errorHandling", "fileDescriptorPath", "messageName"}), kafkaProvider.configurationSchema().getFields().stream().map(field -> field.getName()).collect(Collectors.toSet()));
    }

    @Test
    public void testBuildTransformWithAvroSchema() {
        ServiceLoader<SchemaTransformProvider> serviceLoader = ServiceLoader.load(SchemaTransformProvider.class);
        List providers = StreamSupport.stream(serviceLoader.spliterator(), false).filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class).collect(Collectors.toList());
        KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider)providers.get(0);
        kafkaProvider.from(KafkaReadSchemaTransformConfiguration.builder().setFormat("AVRO").setTopic("anytopic").setBootstrapServers("anybootstrap").setSchema(AVRO_SCHEMA).build());
    }

    @Test
    public void testBuildTransformWithJsonSchema() throws IOException {
        ServiceLoader<SchemaTransformProvider> serviceLoader = ServiceLoader.load(SchemaTransformProvider.class);
        List providers = StreamSupport.stream(serviceLoader.spliterator(), false).filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class).collect(Collectors.toList());
        KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider)providers.get(0);
        kafkaProvider.from(KafkaReadSchemaTransformConfiguration.builder().setTopic("anytopic").setBootstrapServers("anybootstrap").setFormat("JSON").setSchema(new String(ByteStreams.toByteArray((InputStream)Objects.requireNonNull(this.getClass().getResourceAsStream("/json-schema/basic_json_schema.json"))), StandardCharsets.UTF_8)).build());
    }

    @Test
    public void testBuildTransformWithRawFormat() {
        ServiceLoader<SchemaTransformProvider> serviceLoader = ServiceLoader.load(SchemaTransformProvider.class);
        List providers = StreamSupport.stream(serviceLoader.spliterator(), false).filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class).collect(Collectors.toList());
        KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider)providers.get(0);
        kafkaProvider.from(KafkaReadSchemaTransformConfiguration.builder().setTopic("anytopic").setBootstrapServers("anybootstrap").setFormat("RAW").build());
    }

    @Test
    public void testBuildTransformWithProtoFormat() {
        ServiceLoader<SchemaTransformProvider> serviceLoader = ServiceLoader.load(SchemaTransformProvider.class);
        List providers = StreamSupport.stream(serviceLoader.spliterator(), false).filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class).collect(Collectors.toList());
        KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider)providers.get(0);
        kafkaProvider.from(KafkaReadSchemaTransformConfiguration.builder().setTopic("anytopic").setBootstrapServers("anybootstrap").setFormat("PROTO").setMessageName("MyMessage").setFileDescriptorPath(Objects.requireNonNull(this.getClass().getResource("/proto_byte/file_descriptor/proto_byte_utils.pb")).getPath()).build());
    }

    @Test
    public void testBuildTransformWithProtoFormatWrongMessageName() {
        ServiceLoader<SchemaTransformProvider> serviceLoader = ServiceLoader.load(SchemaTransformProvider.class);
        List providers = StreamSupport.stream(serviceLoader.spliterator(), false).filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class).collect(Collectors.toList());
        KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider)providers.get(0);
        Assert.assertThrows(NullPointerException.class, () -> kafkaProvider.from(KafkaReadSchemaTransformConfiguration.builder().setTopic("anytopic").setBootstrapServers("anybootstrap").setFormat("PROTO").setMessageName("MyOtherMessage").setFileDescriptorPath(Objects.requireNonNull(this.getClass().getResource("/proto_byte/file_descriptor/proto_byte_utils.pb")).getPath()).build()));
    }

    @Test
    public void testBuildTransformWithProtoSchemaFormat() {
        ServiceLoader<SchemaTransformProvider> serviceLoader = ServiceLoader.load(SchemaTransformProvider.class);
        List providers = StreamSupport.stream(serviceLoader.spliterator(), false).filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class).collect(Collectors.toList());
        KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider)providers.get(0);
        kafkaProvider.from(KafkaReadSchemaTransformConfiguration.builder().setTopic("anytopic").setBootstrapServers("anybootstrap").setFormat("PROTO").setMessageName("MyMessage").setSchema(PROTO_SCHEMA).build());
    }

    @Test
    public void testBuildTransformWithoutProtoSchemaFormat() {
        ServiceLoader<SchemaTransformProvider> serviceLoader = ServiceLoader.load(SchemaTransformProvider.class);
        List providers = StreamSupport.stream(serviceLoader.spliterator(), false).filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class).collect(Collectors.toList());
        KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider)providers.get(0);
        Assert.assertThrows(NullPointerException.class, () -> kafkaProvider.from(KafkaReadSchemaTransformConfiguration.builder().setTopic("anytopic").setBootstrapServers("anybootstrap").setFormat("PROTO").setMessageName("MyMessage").build()));
    }
}

