/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.fileschematransform;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.extensions.avro.io.DynamicAvroDestinations;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans;
import org.apache.beam.sdk.io.fileschematransform.AvroReadSchemaTransformFormatProvider;
import org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformConfiguration;
import org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformFormatProviderTest;
import org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformProvider;
import org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.joda.time.Duration;
import org.junit.Test;

public class AvroReadSchemaTransformFormatProviderTest
extends FileReadSchemaTransformFormatProviderTest {
    @Override
    protected String getFormat() {
        return new AvroReadSchemaTransformFormatProvider().identifier();
    }

    @Override
    public String getStringSchemaFromBeamSchema(Schema beamSchema) {
        return AvroUtils.toAvroSchema((Schema)beamSchema).toString();
    }

    @Override
    public void runWriteAndReadTest(Schema schema, List<Row> rows, String filePath, String schemaFilePath) {
        org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema((Schema)schema);
        String stringSchema = Strings.isNullOrEmpty((String)schemaFilePath) ? avroSchema.toString() : schemaFilePath;
        ((PCollection)((PCollection)this.writePipeline.apply((PTransform)Create.of(rows).withRowSchema(schema))).apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptor.of(GenericRecord.class)).via(AvroUtils.getRowToGenericRecordFunction((org.apache.avro.Schema)avroSchema)))).setCoder((Coder)AvroCoder.of((org.apache.avro.Schema)avroSchema)).apply((PTransform)AvroIO.writeGenericRecords((org.apache.avro.Schema)avroSchema).to(filePath));
        this.writePipeline.run().waitUntilFinish();
        FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(this.getFormat()).setSchema(stringSchema).setFilepattern(filePath + "*").build();
        SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config);
        PCollectionRowTuple output = (PCollectionRowTuple)PCollectionRowTuple.empty((Pipeline)this.readPipeline).apply((PTransform)readTransform);
        PAssert.that((PCollection)output.get("output")).containsInAnyOrder(rows);
        this.readPipeline.run();
    }

    @Test
    public void testStreamingRead() {
        Schema schema = SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
        List<Row> rows = FileWriteSchemaTransformFormatProviderTestData.DATA.allPrimitiveDataTypesRows;
        String folder = this.getFolder();
        ResourceId dir = FileSystems.matchNewResource((String)folder, (boolean)true);
        org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema((Schema)schema);
        String stringSchema = avroSchema.toString();
        FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(this.getFormat()).setFilepattern(folder + "/test_*").setSchema(stringSchema).setPollIntervalMillis(Long.valueOf(100L)).setTerminateAfterSecondsSinceNewOutput(Long.valueOf(3L)).build();
        SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config);
        PCollectionRowTuple output = (PCollectionRowTuple)PCollectionRowTuple.empty((Pipeline)this.readPipeline).apply((PTransform)readTransform);
        ((PCollection)((PCollection)((PCollection)this.readPipeline.apply((PTransform)GenerateSequence.from((long)0L).to(3L).withRate(1L, Duration.millis((long)300L)))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.millis((long)100L))).withAllowedLateness(Duration.ZERO).triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)1))).discardingFiredPanes())).apply((PTransform)MapElements.via((SimpleFunction)new CreateAvroPrimitiveGenericRecord(schema)))).setCoder((Coder)AvroCoder.of((org.apache.avro.Schema)avroSchema)).apply((PTransform)AvroIO.writeGenericRecords((org.apache.avro.Schema)avroSchema).to((DynamicAvroDestinations)new TestDynamicDestinations(dir)).withTempDirectory(dir).withNumShards(1).withWindowedWrites());
        PAssert.that((PCollection)output.get("output")).containsInAnyOrder(rows);
        this.readPipeline.run();
    }

    @Test
    public void testReadWithPCollectionOfFilepatterns() {
        Schema schema = SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
        List<Row> rows = FileWriteSchemaTransformFormatProviderTestData.DATA.allPrimitiveDataTypesRows;
        String folder = this.getFolder();
        org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema((Schema)schema);
        ResourceId dir = FileSystems.matchNewResource((String)folder, (boolean)true);
        ((PCollection)((PCollection)this.writePipeline.apply((PTransform)Create.of(rows).withRowSchema(schema))).apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptor.of(GenericRecord.class)).via(AvroUtils.getRowToGenericRecordFunction((org.apache.avro.Schema)avroSchema)))).setCoder((Coder)AvroCoder.of((org.apache.avro.Schema)avroSchema)).apply((PTransform)AvroIO.writeGenericRecords((org.apache.avro.Schema)avroSchema).to((DynamicAvroDestinations)new TestDynamicDestinations(dir)).withTempDirectory(dir));
        this.writePipeline.run().waitUntilFinish();
        String stringSchema = avroSchema.toString();
        FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(this.getFormat()).setSchema(stringSchema).build();
        SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config);
        Schema patternSchema = this.getFilepatternSchema();
        PCollection filepatterns = ((PCollection)((PCollection)this.readPipeline.apply((PTransform)Create.of(Arrays.asList(folder + "/test_1-*", folder + "/test_2-*", folder + "/test_3-*")))).apply("Create Rows of filepatterns", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.rows()).via((SerializableFunction & Serializable)pattern -> Row.withSchema((Schema)patternSchema).withFieldValue("filepattern", pattern).build()))).setRowSchema(patternSchema);
        PCollectionRowTuple output = (PCollectionRowTuple)PCollectionRowTuple.of((String)"input", (PCollection)filepatterns).apply((PTransform)readTransform);
        PAssert.that((PCollection)output.get("output")).containsInAnyOrder(rows);
        this.readPipeline.run();
    }

    private static class TestDynamicDestinations
    extends DynamicAvroDestinations<GenericRecord, String, GenericRecord> {
        final ResourceId baseDir;

        TestDynamicDestinations(ResourceId baseDir) {
            this.baseDir = baseDir;
        }

        public org.apache.avro.Schema getSchema(String destination) {
            return AvroUtils.toAvroSchema((Schema)SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA);
        }

        public GenericRecord formatRecord(GenericRecord record) {
            return record;
        }

        public String getDestination(GenericRecord element) {
            return element.get("anInteger").toString();
        }

        public String getDefaultDestination() {
            return "";
        }

        public FileBasedSink.FilenamePolicy getFilenamePolicy(String destination) {
            return DefaultFilenamePolicy.fromStandardParameters((ValueProvider)ValueProvider.StaticValueProvider.of((Object)this.baseDir.resolve("test_" + destination, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE)), (String)"-SSSSS-of-NNNNN", (String)".avro", (boolean)false);
        }
    }

    private static class CreateAvroPrimitiveGenericRecord
    extends SimpleFunction<Long, GenericRecord> {
        Schema schema;

        CreateAvroPrimitiveGenericRecord(Schema schema) {
            this.schema = schema;
        }

        public GenericRecord apply(Long l) {
            Row row = FileWriteSchemaTransformFormatProviderTestData.DATA.allPrimitiveDataTypesRows.get(l.intValue());
            return (GenericRecord)AvroUtils.getRowToGenericRecordFunction((org.apache.avro.Schema)AvroUtils.toAvroSchema((Schema)this.schema)).apply((Object)row);
        }
    }
}

