package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.thrift.TestThriftMessage;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.class */
public class BeamKafkaTableThriftTest extends BeamKafkaTableTest {
    private final TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
    private static final Schema TEST_SCHEMA = Schema.builder().addInt64Field("f_long").addInt32Field("f_int").addDoubleField("f_double").addStringField("f_string").addArrayField("f_double_array", Schema.FieldType.DOUBLE).build();
    private static final Schema SHUFFLED_SCHEMA = Schema.builder().addStringField("f_string").addInt32Field("f_int").addArrayField("f_double_array", Schema.FieldType.DOUBLE).addDoubleField("f_double").addInt64Field("f_long").build();

    @Test
    public void testWithShuffledSchema() {
        BeamKafkaThriftTable beamKafkaThriftTable = new BeamKafkaThriftTable(SHUFFLED_SCHEMA, "", ImmutableList.of(), TestThriftMessage.class, this.protocolFactory);
        PAssert.that(this.pipeline.apply(Create.of(shuffledRow(1), new Row[]{shuffledRow(2)})).apply(beamKafkaThriftTable.getPTransformForOutput()).apply(beamKafkaThriftTable.getPTransformForInput())).containsInAnyOrder(new Row[]{generateRow(1), generateRow(2)});
        this.pipeline.run();
    }

    @Test
    public void testSchemasDoNotMatch() {
        Schema build = Schema.builder().addStringField("non_existing_field").build();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            new BeamKafkaThriftTable(build, "", ImmutableList.of(), TestThriftMessage.class, this.protocolFactory);
        })).getMessage(), Matchers.containsString("does not match schema inferred from thrift class.\nThrift class: "));
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTableTest
    protected BeamKafkaTable getBeamKafkaTable() {
        return new BeamKafkaThriftTable(TEST_SCHEMA, "", ImmutableList.of(), TestThriftMessage.class, this.protocolFactory);
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTableTest
    protected Row generateRow(int i) {
        return Row.withSchema(TEST_SCHEMA).addValues(new Object[]{Long.valueOf(i), Integer.valueOf(i), Double.valueOf(i), "thrift_value" + i, ImmutableList.of(Double.valueOf(i))}).build();
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTableTest
    protected byte[] generateEncodedPayload(int i) {
        TestThriftMessage fString = new TestThriftMessage().setFLong(i).setFInt(i).setFDouble(i).setFString("thrift_value" + i);
        fString.addToFDoubleArray(i);
        try {
            return new TSerializer(this.protocolFactory).serialize(fString);
        } catch (TException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private Row shuffledRow(int i) {
        return Row.withSchema(SHUFFLED_SCHEMA).addValues(ImmutableList.of("thrift_value" + i, Integer.valueOf(i), ImmutableList.of(Double.valueOf(i)), Double.valueOf(i), Long.valueOf(i))).build();
    }
}
