/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import com.alibaba.fastjson.JSON;
import java.util.List;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTableTest;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProvider;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.io.kafka.ProducerRecordCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class BeamKafkaTableProtoTest
extends BeamKafkaTableTest {
    private static final Schema TEST_SCHEMA = Schema.builder().addInt64Field("f_long").addInt32Field("f_int").addDoubleField("f_double").addStringField("f_string").addArrayField("f_float_array", Schema.FieldType.FLOAT).build();
    private static final Schema SHUFFLED_SCHEMA = Schema.builder().addStringField("f_string").addInt32Field("f_int").addArrayField("f_float_array", Schema.FieldType.FLOAT).addDoubleField("f_double").addInt64Field("f_long").build();

    @Test
    public void testWithShuffledSchema() throws Exception {
        BeamKafkaTable kafkaTable = BeamKafkaTableProtoTest.getBeamKafkaTable(SHUFFLED_SCHEMA);
        PCollection result = (PCollection)((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)this.shuffledRow(1), (Object[])new Row[]{this.shuffledRow(2)}))).apply(kafkaTable.getPTransformForOutput())).setCoder((Coder)ProducerRecordCoder.of((Coder)ByteArrayCoder.of(), (Coder)ByteArrayCoder.of())).apply((PTransform)MapElements.via((SimpleFunction)new BeamKafkaTableTest.ProducerToRecord()))).setCoder((Coder)KafkaRecordCoder.of((Coder)ByteArrayCoder.of(), (Coder)ByteArrayCoder.of())).apply(kafkaTable.getPTransformForInput());
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{this.shuffledRow(1), this.shuffledRow(2)});
        this.pipeline.run();
    }

    @Test
    public void testSchemasDoNotMatch() {
        Schema schema = Schema.builder().addStringField("non_existing_field").build();
        IllegalArgumentException e = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> BeamKafkaTableProtoTest.getBeamKafkaTable(schema));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)Matchers.containsString((String)"does not match schema inferred from protobuf class.\nProtobuf class: "));
    }

    private static BeamKafkaTable getBeamKafkaTable(Schema schema) {
        return (BeamKafkaTable)new KafkaTableProvider().buildBeamSqlTable(Table.builder().name("kafka").type("kafka").schema(schema).location("localhost/mytopic").properties(JSON.parseObject((String)("{ \"format\": \"proto\", \"protoClass\": \"" + PayloadMessages.TestMessage.class.getName() + "\" }"))).build());
    }

    @Override
    protected BeamKafkaTable getBeamKafkaTable() {
        return BeamKafkaTableProtoTest.getBeamKafkaTable(TEST_SCHEMA);
    }

    @Override
    protected Row generateRow(int i) {
        ImmutableList values = ImmutableList.of((Object)i, (Object)i, (Object)i, (Object)("proto_value" + i), (Object)ImmutableList.of((Object)Float.valueOf(i)));
        return Row.withSchema((Schema)TEST_SCHEMA).addValues((List)values).build();
    }

    @Override
    protected byte[] generateEncodedPayload(int i) {
        PayloadMessages.TestMessage message = PayloadMessages.TestMessage.newBuilder().setFLong((long)i).setFInt(i).setFDouble((double)i).setFString("proto_value" + i).addFFloatArray((float)i).build();
        return message.toByteArray();
    }

    private Row shuffledRow(int i) {
        ImmutableList values = ImmutableList.of((Object)("proto_value" + i), (Object)i, (Object)ImmutableList.of((Object)Float.valueOf(i)), (Object)i, (Object)i);
        return Row.withSchema((Schema)SHUFFLED_SCHEMA).addValues((List)values).build();
    }
}

