package org.apache.beam.sdk.io.parquet;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.values.PCollection;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/parquet/ParquetIOTest.class */
public class ParquetIOTest implements Serializable {

    @Rule
    public transient TestPipeline mainPipeline = TestPipeline.create();

    @Rule
    public transient TestPipeline readPipeline = TestPipeline.create();

    @Rule
    public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final String SCHEMA_STRING = "{\"type\":\"record\", \"name\":\"testrecord\",\"fields\":[    {\"name\":\"name\",\"type\":\"string\"},    {\"name\":\"id\",\"type\":\"string\"}  ]}";
    private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_STRING);
    private static final String REQUESTED_SCHEMA_ENCODER_STRING = "{\"type\":\"record\", \"name\":\"testrecord\",\"fields\":[    {\"name\":\"name\",\"type\":[\"string\",\"null\"]},    {\"name\":\"id\",\"type\":\"string\"}  ]}";
    private static final Schema REQUESTED_ENCODER_SCHEMA = new Schema.Parser().parse(REQUESTED_SCHEMA_ENCODER_STRING);
    private static final String REQUESTED_SCHEMA_STRING = "{\"type\":\"record\", \"name\":\"testrecord\",\"fields\":[    {\"name\":\"id\",\"type\":\"string\"}  ]}";
    private static final Schema REQUESTED_SCHEMA = new Schema.Parser().parse(REQUESTED_SCHEMA_STRING);
    private static final String[] SCIENTISTS = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/parquet/ParquetIOTest$ParseGenericRecordAsJsonFn.class */
    public static class ParseGenericRecordAsJsonFn implements SerializableFunction<GenericRecord, String> {
        private ParseGenericRecordAsJsonFn() {
        }

        public static ParseGenericRecordAsJsonFn create() {
            return new ParseGenericRecordAsJsonFn();
        }

        public String apply(GenericRecord genericRecord) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(genericRecord.getSchema(), byteArrayOutputStream, true);
                new GenericDatumWriter(genericRecord.getSchema()).write(genericRecord, jsonEncoder);
                jsonEncoder.flush();
                return byteArrayOutputStream.toString();
            } catch (IOException e) {
                throw new RuntimeException("error converting record to JSON", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/parquet/ParquetIOTest$TestRecord.class */
    public static class TestRecord {
        String name;

        public TestRecord(String str) {
            this.name = str;
        }
    }

    @Test
    public void testWriteAndReadWithProjection() {
        List<GenericRecord> generateRequestedRecords = generateRequestedRecords(1000L);
        this.mainPipeline.apply(Create.of(generateGenericRecords(1000L)).withCoder(AvroCoder.of(SCHEMA))).apply(FileIO.write().via(ParquetIO.sink(SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PAssert.that(this.readPipeline.apply(ParquetIO.read(SCHEMA).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*").withProjection(REQUESTED_SCHEMA, REQUESTED_ENCODER_SCHEMA))).containsInAnyOrder(generateRequestedRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testBlockTracker() {
        ParquetIO.ReadFiles.BlockTracker blockTracker = new ParquetIO.ReadFiles.BlockTracker(new OffsetRange(0L, 1L), 7L, 3L);
        Assert.assertEquals(blockTracker.getProgress().getWorkRemaining(), 1.0d, 0.01d);
        Assert.assertEquals(blockTracker.getProgress().getWorkCompleted(), 0.0d, 0.01d);
        blockTracker.tryClaim(0L);
        blockTracker.tryClaim(1L);
        Assert.assertEquals(blockTracker.getProgress().getWorkRemaining(), 0.0d, 0.01d);
        Assert.assertEquals(blockTracker.getProgress().getWorkCompleted(), 1.0d, 0.01d);
    }

    @Test
    public void testSplitBlockWithLimit() {
        ParquetIO.ReadFiles.SplitReadFn splitReadFn = new ParquetIO.ReadFiles.SplitReadFn((GenericData) null, (Schema) null, ParquetIO.GenericRecordPassthroughFn.create(), (SerializableConfiguration) null);
        ArrayList arrayList = new ArrayList();
        BlockMetaData blockMetaData = (BlockMetaData) Mockito.mock(BlockMetaData.class);
        Mockito.when(Long.valueOf(blockMetaData.getTotalByteSize())).thenReturn(60L);
        Assert.assertTrue(splitReadFn.splitBlockWithLimit(0L, arrayList.size(), arrayList, 200L).isEmpty());
        for (int i = 0; i < 6; i++) {
            arrayList.add(blockMetaData);
        }
        ArrayList splitBlockWithLimit = splitReadFn.splitBlockWithLimit(1L, arrayList.size(), arrayList, 200L);
        Assert.assertEquals(1L, ((OffsetRange) splitBlockWithLimit.get(0)).getFrom());
        Assert.assertEquals(5L, ((OffsetRange) splitBlockWithLimit.get(0)).getTo());
        Assert.assertEquals(5L, ((OffsetRange) splitBlockWithLimit.get(1)).getFrom());
        Assert.assertEquals(6L, ((OffsetRange) splitBlockWithLimit.get(1)).getTo());
        Assert.assertEquals(2L, splitBlockWithLimit.size());
    }

    @Test
    public void testWriteAndRead() {
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(SCHEMA))).apply(FileIO.write().via(ParquetIO.sink(SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PAssert.that(this.readPipeline.apply(ParquetIO.read(SCHEMA).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"))).containsInAnyOrder(generateGenericRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadWithSplit() {
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(SCHEMA))).apply(FileIO.write().via(ParquetIO.sink(SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PAssert.that(this.readPipeline.apply(ParquetIO.read(SCHEMA).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*").withSplit())).containsInAnyOrder(generateGenericRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadWithBeamSchema() {
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(SCHEMA))).apply(FileIO.write().via(ParquetIO.sink(SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PAssert.that(this.readPipeline.apply(ParquetIO.read(SCHEMA).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*").withBeamSchemas(true))).containsInAnyOrder(generateGenericRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadFilesAsJsonForWithSplitForUnknownSchema() {
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(SCHEMA))).apply(FileIO.write().via(ParquetIO.sink(SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PAssert.that(this.readPipeline.apply(ParquetIO.parseGenericRecords(ParseGenericRecordAsJsonFn.create()).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*").withSplit())).containsInAnyOrder(convertRecordsToJson(generateGenericRecords));
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadFiles() {
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        PAssert.that(this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(SCHEMA))).apply(FileIO.write().via(ParquetIO.sink(SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath())).getPerDestinationOutputFilenames().apply(Values.create()).apply(FileIO.matchAll()).apply(FileIO.readMatches()).apply(ParquetIO.readFiles(SCHEMA))).containsInAnyOrder(generateGenericRecords);
        this.mainPipeline.run().waitUntilFinish();
    }

    @Test
    public void testReadFilesAsJsonForUnknownSchemaFiles() {
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        List<String> convertRecordsToJson = convertRecordsToJson(generateGenericRecords);
        PCollection apply = this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(SCHEMA))).apply(FileIO.write().via(ParquetIO.sink(SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath())).getPerDestinationOutputFilenames().apply(Values.create()).apply(FileIO.matchAll()).apply(FileIO.readMatches()).apply(ParquetIO.parseFilesGenericRecords(ParseGenericRecordAsJsonFn.create()));
        Assert.assertEquals(1000L, convertRecordsToJson.size());
        PAssert.that(apply).containsInAnyOrder(convertRecordsToJson);
        this.mainPipeline.run().waitUntilFinish();
    }

    @Test
    public void testReadFilesUnknownSchemaFilesForGenericRecordThrowException() {
        Assert.assertEquals("Parse can't be used for reading as GenericRecord.", ((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            ParquetIO.parseFilesGenericRecords(ParquetIO.GenericRecordPassthroughFn.create()).expand((PCollection) null);
        })).getMessage());
    }

    private List<GenericRecord> generateGenericRecords(long j) {
        ArrayList arrayList = new ArrayList();
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(SCHEMA);
        for (int i = 0; i < j; i++) {
            arrayList.add(genericRecordBuilder.set("name", SCIENTISTS[i % SCIENTISTS.length]).set("id", Integer.toString(i)).build());
        }
        return arrayList;
    }

    private List<GenericRecord> generateRequestedRecords(long j) {
        ArrayList arrayList = new ArrayList();
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(REQUESTED_ENCODER_SCHEMA);
        for (int i = 0; i < j; i++) {
            int length = i % SCIENTISTS.length;
            arrayList.add(genericRecordBuilder.set("id", Integer.toString(i)).set("name", (Object) null).build());
        }
        return arrayList;
    }

    @Test
    public void testReadDisplayData() {
        MatcherAssert.assertThat(DisplayData.from(ParquetIO.read(SCHEMA).from("foo.parquet")), DisplayDataMatchers.hasDisplayItem("filePattern", "foo.parquet"));
    }

    @Test(expected = Pipeline.PipelineExecutionException.class)
    public void testWriteAndReadUsingReflectDataSchemaWithoutDataModelThrowsException() {
        Schema schema = ReflectData.get().getSchema(TestRecord.class);
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(schema))).apply(FileIO.write().via(ParquetIO.sink(schema)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PAssert.that(this.readPipeline.apply(ParquetIO.read(schema).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"))).containsInAnyOrder(generateGenericRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test(expected = Pipeline.PipelineExecutionException.class)
    public void testWriteAndReadWithSplitUsingReflectDataSchemaWithoutDataModelThrowsException() {
        Schema schema = ReflectData.get().getSchema(TestRecord.class);
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(schema))).apply(FileIO.write().via(ParquetIO.sink(schema)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PAssert.that(this.readPipeline.apply(ParquetIO.read(schema).withSplit().from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"))).containsInAnyOrder(generateGenericRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadUsingReflectDataSchemaWithDataModel() {
        Schema schema = ReflectData.get().getSchema(TestRecord.class);
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(schema))).apply(FileIO.write().via(ParquetIO.sink(schema)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PAssert.that(this.readPipeline.apply(ParquetIO.read(schema).withAvroDataModel(GenericData.get()).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"))).containsInAnyOrder(generateGenericRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadwithSplitUsingReflectDataSchemaWithDataModel() {
        Schema schema = ReflectData.get().getSchema(TestRecord.class);
        List<GenericRecord> generateGenericRecords = generateGenericRecords(1000L);
        this.mainPipeline.apply(Create.of(generateGenericRecords).withCoder(AvroCoder.of(schema))).apply(FileIO.write().via(ParquetIO.sink(schema)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PAssert.that(this.readPipeline.apply(ParquetIO.read(schema).withSplit().withAvroDataModel(GenericData.get()).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"))).containsInAnyOrder(generateGenericRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    private static List<String> convertRecordsToJson(List<GenericRecord> list) {
        Stream<GenericRecord> stream = list.stream();
        ParseGenericRecordAsJsonFn create = ParseGenericRecordAsJsonFn.create();
        Objects.requireNonNull(create);
        return (List) stream.map(create::apply).collect(Collectors.toList());
    }
}
