package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.List;
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.thrift.payloads.SimpleThriftMessage;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.class */
public class KafkaTableProviderTest {
    private final KafkaTableProvider provider = new KafkaTableProvider();
    private static final String LOCATION_BROKER = "104.126.7.88:7743";
    private static final String LOCATION_TOPIC = "topic1";

    @Test
    public void testBuildBeamSqlCSVTable() {
        BeamKafkaCSVTable buildBeamSqlTable = this.provider.buildBeamSqlTable(mockTable("hello"));
        Assert.assertNotNull(buildBeamSqlTable);
        Assert.assertTrue(buildBeamSqlTable instanceof BeamKafkaCSVTable);
        BeamKafkaCSVTable beamKafkaCSVTable = buildBeamSqlTable;
        Assert.assertEquals(LOCATION_BROKER, beamKafkaCSVTable.getBootstrapServers());
        Assert.assertEquals(ImmutableList.of(LOCATION_TOPIC), beamKafkaCSVTable.getTopics());
    }

    @Test
    public void testBuildWithExtraServers() {
        BeamKafkaCSVTable buildBeamSqlTable = this.provider.buildBeamSqlTable(mockTableWithExtraServers("hello", ImmutableList.of("localhost:1111", "localhost:2222")));
        Assert.assertNotNull(buildBeamSqlTable);
        Assert.assertTrue(buildBeamSqlTable instanceof BeamKafkaCSVTable);
        BeamKafkaCSVTable beamKafkaCSVTable = buildBeamSqlTable;
        Assert.assertEquals("104.126.7.88:7743,localhost:1111,localhost:2222", beamKafkaCSVTable.getBootstrapServers());
        Assert.assertEquals(ImmutableList.of(LOCATION_TOPIC), beamKafkaCSVTable.getTopics());
    }

    @Test
    public void testBuildWithExtraTopics() {
        BeamKafkaCSVTable buildBeamSqlTable = this.provider.buildBeamSqlTable(mockTableWithExtraTopics("hello", ImmutableList.of("topic2", "topic3")));
        Assert.assertNotNull(buildBeamSqlTable);
        Assert.assertTrue(buildBeamSqlTable instanceof BeamKafkaCSVTable);
        BeamKafkaCSVTable beamKafkaCSVTable = buildBeamSqlTable;
        Assert.assertEquals(LOCATION_BROKER, beamKafkaCSVTable.getBootstrapServers());
        Assert.assertEquals(ImmutableList.of(LOCATION_TOPIC, "topic2", "topic3"), beamKafkaCSVTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlAvroTable() {
        BeamKafkaTable buildBeamSqlTable = this.provider.buildBeamSqlTable(mockTable("hello", "avro"));
        Assert.assertNotNull(buildBeamSqlTable);
        Assert.assertTrue(buildBeamSqlTable instanceof BeamKafkaTable);
        BeamKafkaTable beamKafkaTable = buildBeamSqlTable;
        Assert.assertEquals(LOCATION_BROKER, beamKafkaTable.getBootstrapServers());
        Assert.assertEquals(ImmutableList.of(LOCATION_TOPIC), beamKafkaTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlProtoTable() {
        BeamKafkaTable buildBeamSqlTable = this.provider.buildBeamSqlTable(mockProtoTable("hello", PayloadMessages.SimpleMessage.class));
        Assert.assertNotNull(buildBeamSqlTable);
        Assert.assertTrue(buildBeamSqlTable instanceof BeamKafkaTable);
        BeamKafkaTable beamKafkaTable = buildBeamSqlTable;
        Assert.assertEquals(LOCATION_BROKER, beamKafkaTable.getBootstrapServers());
        Assert.assertEquals(ImmutableList.of(LOCATION_TOPIC), beamKafkaTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlThriftTable() {
        BeamKafkaTable buildBeamSqlTable = this.provider.buildBeamSqlTable(mockThriftTable("hello", SimpleThriftMessage.class, TCompactProtocol.Factory.class));
        Assert.assertNotNull(buildBeamSqlTable);
        Assert.assertTrue(buildBeamSqlTable instanceof BeamKafkaTable);
        BeamKafkaTable beamKafkaTable = buildBeamSqlTable;
        Assert.assertEquals(LOCATION_BROKER, beamKafkaTable.getBootstrapServers());
        Assert.assertEquals(ImmutableList.of(LOCATION_TOPIC), beamKafkaTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlNestedBytesTable() {
        BeamKafkaTable buildBeamSqlTable = this.provider.buildBeamSqlTable(mockNestedBytesTable("hello"));
        Assert.assertNotNull(buildBeamSqlTable);
        Assert.assertTrue(buildBeamSqlTable instanceof NestedPayloadKafkaTable);
        BeamKafkaTable beamKafkaTable = buildBeamSqlTable;
        Assert.assertEquals(LOCATION_BROKER, beamKafkaTable.getBootstrapServers());
        Assert.assertEquals(ImmutableList.of(LOCATION_TOPIC), beamKafkaTable.getTopics());
    }

    @Test
    public void testBuildBeamSqlNestedThriftTable() {
        BeamKafkaTable buildBeamSqlTable = this.provider.buildBeamSqlTable(mockNestedThriftTable("hello", SimpleThriftMessage.class, TCompactProtocol.Factory.class));
        Assert.assertNotNull(buildBeamSqlTable);
        Assert.assertTrue(buildBeamSqlTable instanceof NestedPayloadKafkaTable);
        BeamKafkaTable beamKafkaTable = buildBeamSqlTable;
        Assert.assertEquals(LOCATION_BROKER, beamKafkaTable.getBootstrapServers());
        Assert.assertEquals(ImmutableList.of(LOCATION_TOPIC), beamKafkaTable.getTopics());
    }

    @Test
    public void testGetTableType() {
        Assert.assertEquals("kafka", this.provider.getTableType());
    }

    private static Table mockTable(String str) {
        return mockTable(str, false, null, null, null, null, null, null);
    }

    private static Table mockTableWithExtraServers(String str, List<String> list) {
        return mockTable(str, false, list, null, null, null, null, null);
    }

    private static Table mockTableWithExtraTopics(String str, List<String> list) {
        return mockTable(str, false, null, list, null, null, null, null);
    }

    private static Table mockTable(String str, String str2) {
        return mockTable(str, false, null, null, str2, null, null, null);
    }

    private static Table mockProtoTable(String str, Class<?> cls) {
        return mockTable(str, false, null, null, "proto", cls, null, null);
    }

    private static Table mockThriftTable(String str, Class<? extends TBase<?, ?>> cls, Class<? extends TProtocolFactory> cls2) {
        return mockTable(str, false, null, null, "thrift", null, cls, cls2);
    }

    private static Table mockNestedBytesTable(String str) {
        return mockTable(str, true, null, null, null, null, null, null);
    }

    private static Table mockNestedThriftTable(String str, Class<? extends TBase<?, ?>> cls, Class<? extends TProtocolFactory> cls2) {
        return mockTable(str, true, null, null, "thrift", null, cls, cls2);
    }

    private static Table mockTable(String str, boolean z, List<String> list, List<String> list2, String str2, Class<?> cls, Class<? extends TBase<?, ?>> cls2, Class<? extends TProtocolFactory> cls3) {
        Schema schema;
        JSONObject jSONObject = new JSONObject();
        if (list != null) {
            JSONArray jSONArray = new JSONArray();
            jSONArray.addAll(list);
            jSONObject.put("bootstrap_servers", jSONArray);
        }
        if (list2 != null) {
            JSONArray jSONArray2 = new JSONArray();
            jSONArray2.addAll(list2);
            jSONObject.put("topics", jSONArray2);
        }
        if (str2 != null) {
            jSONObject.put("format", str2);
        }
        if (cls != null) {
            jSONObject.put("protoClass", cls.getName());
        }
        if (cls2 != null) {
            jSONObject.put("thriftClass", cls2.getName());
        }
        if (cls3 != null) {
            jSONObject.put("thriftProtocolFactoryClass", cls3.getName());
        }
        Schema build = Schema.builder().addInt32Field("id").addStringField("name").build();
        if (z) {
            Schema.Builder builder = Schema.builder();
            builder.addField("headers", Schemas.HEADERS_FIELD_TYPE);
            if (str2 == null) {
                builder.addByteArrayField("payload");
            } else {
                builder.addRowField("payload", build);
            }
            schema = builder.build();
        } else {
            schema = build;
        }
        return Table.builder().name(str).comment(str + " table").location("104.126.7.88:7743/topic1").schema(schema).type("kafka").properties(jSONObject).build();
    }
}
