/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.List;
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaCSVTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.NestedPayloadKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.Schemas;
import org.apache.beam.sdk.io.thrift.payloads.SimpleThriftMessage;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Assert;
import org.junit.Test;

public class KafkaTableProviderTest {
    private final KafkaTableProvider provider = new KafkaTableProvider();
    private static final String LOCATION_BROKER = "104.126.7.88:7743";
    private static final String LOCATION_TOPIC = "topic1";

    @Test
    public void testBuildBeamSqlCSVTable() {
        Table table = KafkaTableProviderTest.mockTable("hello");
        BeamSqlTable sqlTable = this.provider.buildBeamSqlTable(table);
        Assert.assertNotNull((Object)sqlTable);
        Assert.assertTrue((boolean)(sqlTable instanceof BeamKafkaCSVTable));
        BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable)sqlTable;
        Assert.assertEquals((Object)LOCATION_BROKER, (Object)kafkaTable.getBootstrapServers());
        Assert.assertEquals((Object)ImmutableList.of((Object)LOCATION_TOPIC), (Object)kafkaTable.getTopics());
    }

    @Test
    public void testBuildWithExtraServers() {
        Table table = KafkaTableProviderTest.mockTableWithExtraServers("hello", (List<String>)ImmutableList.of((Object)"localhost:1111", (Object)"localhost:2222"));
        BeamSqlTable sqlTable = this.provider.buildBeamSqlTable(table);
        Assert.assertNotNull((Object)sqlTable);
        Assert.assertTrue((boolean)(sqlTable instanceof BeamKafkaCSVTable));
        BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable)sqlTable;
        Assert.assertEquals((Object)"104.126.7.88:7743,localhost:1111,localhost:2222", (Object)kafkaTable.getBootstrapServers());
        Assert.assertEquals((Object)ImmutableList.of((Object)LOCATION_TOPIC), (Object)kafkaTable.getTopics());
    }

    @Test
    public void testBuildWithExtraTopics() {
        Table table = KafkaTableProviderTest.mockTableWithExtraTopics("hello", (List<String>)ImmutableList.of((Object)"topic2", (Object)"topic3"));
        BeamSqlTable sqlTable = this.provider.buildBeamSqlTable(table);
        Assert.assertNotNull((Object)sqlTable);
        Assert.assertTrue((boolean)(sqlTable instanceof BeamKafkaCSVTable));
        BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable)sqlTable;
        Assert.assertEquals((Object)LOCATION_BROKER, (Object)kafkaTable.getBootstrapServers());
        Assert.assertEquals((Object)ImmutableList.of((Object)LOCATION_TOPIC, (Object)"topic2", (Object)"topic3"), (Object)kafkaTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlAvroTable() {
        Table table = KafkaTableProviderTest.mockTable("hello", "avro");
        BeamSqlTable sqlTable = this.provider.buildBeamSqlTable(table);
        Assert.assertNotNull((Object)sqlTable);
        Assert.assertTrue((boolean)(sqlTable instanceof BeamKafkaTable));
        BeamKafkaTable kafkaTable = (BeamKafkaTable)sqlTable;
        Assert.assertEquals((Object)LOCATION_BROKER, (Object)kafkaTable.getBootstrapServers());
        Assert.assertEquals((Object)ImmutableList.of((Object)LOCATION_TOPIC), (Object)kafkaTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlProtoTable() {
        Table table = KafkaTableProviderTest.mockProtoTable("hello", PayloadMessages.SimpleMessage.class);
        BeamSqlTable sqlTable = this.provider.buildBeamSqlTable(table);
        Assert.assertNotNull((Object)sqlTable);
        Assert.assertTrue((boolean)(sqlTable instanceof BeamKafkaTable));
        BeamKafkaTable kafkaTable = (BeamKafkaTable)sqlTable;
        Assert.assertEquals((Object)LOCATION_BROKER, (Object)kafkaTable.getBootstrapServers());
        Assert.assertEquals((Object)ImmutableList.of((Object)LOCATION_TOPIC), (Object)kafkaTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlThriftTable() {
        Table table = KafkaTableProviderTest.mockThriftTable("hello", SimpleThriftMessage.class, TCompactProtocol.Factory.class);
        BeamSqlTable sqlTable = this.provider.buildBeamSqlTable(table);
        Assert.assertNotNull((Object)sqlTable);
        Assert.assertTrue((boolean)(sqlTable instanceof BeamKafkaTable));
        BeamKafkaTable kafkaTable = (BeamKafkaTable)sqlTable;
        Assert.assertEquals((Object)LOCATION_BROKER, (Object)kafkaTable.getBootstrapServers());
        Assert.assertEquals((Object)ImmutableList.of((Object)LOCATION_TOPIC), (Object)kafkaTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlNestedBytesTable() {
        Table table = KafkaTableProviderTest.mockNestedBytesTable("hello");
        BeamSqlTable sqlTable = this.provider.buildBeamSqlTable(table);
        Assert.assertNotNull((Object)sqlTable);
        Assert.assertTrue((boolean)(sqlTable instanceof NestedPayloadKafkaTable));
        BeamKafkaTable kafkaTable = (BeamKafkaTable)sqlTable;
        Assert.assertEquals((Object)LOCATION_BROKER, (Object)kafkaTable.getBootstrapServers());
        Assert.assertEquals((Object)ImmutableList.of((Object)LOCATION_TOPIC), (Object)kafkaTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlNestedThriftTable() {
        Table table = KafkaTableProviderTest.mockNestedThriftTable("hello", SimpleThriftMessage.class, TCompactProtocol.Factory.class);
        BeamSqlTable sqlTable = this.provider.buildBeamSqlTable(table);
        Assert.assertNotNull((Object)sqlTable);
        Assert.assertTrue((boolean)(sqlTable instanceof NestedPayloadKafkaTable));
        BeamKafkaTable kafkaTable = (BeamKafkaTable)sqlTable;
        Assert.assertEquals((Object)LOCATION_BROKER, (Object)kafkaTable.getBootstrapServers());
        Assert.assertEquals((Object)ImmutableList.of((Object)LOCATION_TOPIC), (Object)kafkaTable.getTopics());
    }

    @Test
    public void testGetTableType() {
        Assert.assertEquals((Object)"kafka", (Object)this.provider.getTableType());
    }

    private static Table mockTable(String name) {
        return KafkaTableProviderTest.mockTable(name, false, null, null, null, null, null, null);
    }

    private static Table mockTableWithExtraServers(String name, List<String> extraBootstrapServers) {
        return KafkaTableProviderTest.mockTable(name, false, extraBootstrapServers, null, null, null, null, null);
    }

    private static Table mockTableWithExtraTopics(String name, List<String> extraTopics) {
        return KafkaTableProviderTest.mockTable(name, false, null, extraTopics, null, null, null, null);
    }

    private static Table mockTable(String name, String payloadFormat) {
        return KafkaTableProviderTest.mockTable(name, false, null, null, payloadFormat, null, null, null);
    }

    private static Table mockProtoTable(String name, Class<?> protoClass) {
        return KafkaTableProviderTest.mockTable(name, false, null, null, "proto", protoClass, null, null);
    }

    private static Table mockThriftTable(String name, Class<? extends TBase<?, ?>> thriftClass, Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
        return KafkaTableProviderTest.mockTable(name, false, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
    }

    private static Table mockNestedBytesTable(String name) {
        return KafkaTableProviderTest.mockTable(name, true, null, null, null, null, null, null);
    }

    private static Table mockNestedThriftTable(String name, Class<? extends TBase<?, ?>> thriftClass, Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
        return KafkaTableProviderTest.mockTable(name, true, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
    }

    private static Table mockTable(String name, boolean isNested, @Nullable List<String> extraBootstrapServers, @Nullable List<String> extraTopics, @Nullable String payloadFormat, @Nullable Class<?> protoClass, @Nullable Class<? extends TBase<?, ?>> thriftClass, @Nullable Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
        Schema schema;
        JSONObject properties = new JSONObject();
        if (extraBootstrapServers != null) {
            JSONArray bootstrapServers = new JSONArray();
            bootstrapServers.addAll(extraBootstrapServers);
            properties.put("bootstrap_servers", (Object)bootstrapServers);
        }
        if (extraTopics != null) {
            JSONArray topics = new JSONArray();
            topics.addAll(extraTopics);
            properties.put("topics", (Object)topics);
        }
        if (payloadFormat != null) {
            properties.put("format", (Object)payloadFormat);
        }
        if (protoClass != null) {
            properties.put("protoClass", (Object)protoClass.getName());
        }
        if (thriftClass != null) {
            properties.put("thriftClass", (Object)thriftClass.getName());
        }
        if (thriftProtocolFactoryClass != null) {
            properties.put("thriftProtocolFactoryClass", (Object)thriftProtocolFactoryClass.getName());
        }
        Schema payloadSchema = Schema.builder().addInt32Field("id").addStringField("name").build();
        if (isNested) {
            Schema.Builder schemaBuilder = Schema.builder();
            schemaBuilder.addField("headers", Schemas.HEADERS_FIELD_TYPE);
            if (payloadFormat == null) {
                schemaBuilder.addByteArrayField("payload");
            } else {
                schemaBuilder.addRowField("payload", payloadSchema);
            }
            schema = schemaBuilder.build();
        } else {
            schema = payloadSchema;
        }
        return Table.builder().name(name).comment(name + " table").location("104.126.7.88:7743/topic1").schema(schema).type("kafka").properties(properties).build();
    }
}

