package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.io.Serializable;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaCSVTable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.commons.csv.CSVFormat;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.class */
public class BeamKafkaCSVTableTest {

    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    public static BeamRecord row1;
    public static BeamRecord row2;

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest$String2KvBytes.class */
    private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>> implements Serializable {
        private String2KvBytes() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, KV<byte[], byte[]>>.ProcessContext processContext) {
            processContext.output(KV.of(new byte[0], ((String) processContext.element()).getBytes()));
        }
    }

    @BeforeClass
    public static void setUp() {
        row1 = new BeamRecord(genRowType(), new Object[]{1L, 1, Double.valueOf(1.0d)});
        row2 = new BeamRecord(genRowType(), new Object[]{2L, 2, Double.valueOf(2.0d)});
    }

    @Test
    public void testCsvRecorderDecoder() throws Exception {
        PAssert.that(this.pipeline.apply(Create.of("1,\"1\",1.0", new String[]{"2,2,2.0"})).apply(ParDo.of(new String2KvBytes())).apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT))).containsInAnyOrder(new BeamRecord[]{row1, row2});
        this.pipeline.run();
    }

    @Test
    public void testCsvRecorderEncoder() throws Exception {
        PAssert.that(this.pipeline.apply(Create.of(row1, new BeamRecord[]{row2})).apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT)).apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT))).containsInAnyOrder(new BeamRecord[]{row1, row2});
        this.pipeline.run();
    }

    private static BeamRecordSqlType genRowType() {
        RelProtoDataType relProtoDataType = relDataTypeFactory -> {
            return relDataTypeFactory.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER).add("price", SqlTypeName.DOUBLE).build();
        };
        return CalciteUtils.toBeamRowType((RelDataType) relProtoDataType.apply(BeamQueryPlanner.TYPE_FACTORY));
    }
}
