package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaMessages;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.class */
public class BeamKafkaTableProtoTest extends BeamKafkaTableTest {
    private static final Schema TEST_SCHEMA = Schema.builder().addInt64Field("f_long").addInt32Field("f_int").addDoubleField("f_double").addStringField("f_string").addArrayField("f_float_array", Schema.FieldType.FLOAT).build();
    private static final Schema SHUFFLED_SCHEMA = Schema.builder().addStringField("f_string").addInt32Field("f_int").addArrayField("f_float_array", Schema.FieldType.FLOAT).addDoubleField("f_double").addInt64Field("f_long").build();

    @Test
    public void testWithShuffledSchema() {
        BeamKafkaProtoTable beamKafkaProtoTable = new BeamKafkaProtoTable(SHUFFLED_SCHEMA, "", ImmutableList.of(), KafkaMessages.TestMessage.class);
        PAssert.that(this.pipeline.apply(Create.of(shuffledRow(1), new Row[]{shuffledRow(2)})).apply(beamKafkaProtoTable.getPTransformForOutput()).apply(beamKafkaProtoTable.getPTransformForInput())).containsInAnyOrder(new Row[]{generateRow(1), generateRow(2)});
        this.pipeline.run();
    }

    @Test
    public void testSchemasDoNotMatch() {
        Schema build = Schema.builder().addStringField("non_existing_field").build();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            new BeamKafkaProtoTable(build, "", ImmutableList.of(), KafkaMessages.TestMessage.class);
        })).getMessage(), Matchers.containsString("does not match schema inferred from protobuf class.\nProtobuf class: "));
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTableTest
    protected BeamKafkaTable getBeamKafkaTable() {
        return new BeamKafkaProtoTable(TEST_SCHEMA, "", ImmutableList.of(), KafkaMessages.TestMessage.class);
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTableTest
    protected Row generateRow(int i) {
        return Row.withSchema(TEST_SCHEMA).addValues(ImmutableList.of(Long.valueOf(i), Integer.valueOf(i), Double.valueOf(i), "proto_value" + i, ImmutableList.of(Float.valueOf(i)))).build();
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTableTest
    protected byte[] generateEncodedPayload(int i) {
        return KafkaMessages.TestMessage.newBuilder().setFLong(i).setFInt(i).setFDouble(i).setFString("proto_value" + i).addFFloatArray(i).build().toByteArray();
    }

    private Row shuffledRow(int i) {
        return Row.withSchema(SHUFFLED_SCHEMA).addValues(ImmutableList.of("proto_value" + i, Integer.valueOf(i), ImmutableList.of(Float.valueOf(i)), Double.valueOf(i), Long.valueOf(i))).build();
    }
}
