/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Charsets;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.MoreObjects;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Iterators;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Lists;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Maps;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.DynamicAvroDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.SerializableAvroCodecFactory;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class AvroIOTest
implements Serializable {
    @Rule
    public transient TestPipeline writePipeline = TestPipeline.create();
    @Rule
    public transient TestPipeline readPipeline = TestPipeline.create();
    @Rule
    public transient TestPipeline windowedAvroWritePipeline = TestPipeline.create();
    @Rule
    public transient TemporaryFolder tmpFolder = new TemporaryFolder();
    @Rule
    public transient ExpectedException expectedException = ExpectedException.none();
    private static final String SCHEMA_STRING = "{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"AvroGeneratedUser\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n     {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n ]\n}";
    private static final Schema SCHEMA = new Schema.Parser().parse("{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"AvroGeneratedUser\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n     {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n ]\n}");
    private static final String SCHEMA_TEMPLATE_STRING = "{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"TestTemplateSchema$$\",\n \"fields\": [\n     {\"name\": \"$$full\", \"type\": \"string\"},\n     {\"name\": \"$$suffix\", \"type\": [\"string\", \"null\"]}\n ]\n}";

    @Test
    public void testAvroIOGetName() {
        Assert.assertEquals((Object)"AvroIO.Read", (Object)AvroIO.read(String.class).from("/tmp/foo*/baz").getName());
        Assert.assertEquals((Object)"AvroIO.Write", (Object)AvroIO.write(String.class).to("/tmp/foo/baz").getName());
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenReadJavaClass() throws Throwable {
        ImmutableList<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File outputFile = this.tmpFolder.newFile("output.avro");
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(values))).apply((PTransform)AvroIO.write(GenericClass.class).to(this.writePipeline.newProvider((Object)outputFile.getAbsolutePath())).withoutSharding());
        this.writePipeline.run();
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply("Read", (PTransform)AvroIO.read(GenericClass.class).from(this.readPipeline.newProvider((Object)outputFile.getAbsolutePath()))))).containsInAnyOrder(values);
        this.readPipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenReadCustomType() throws Throwable {
        List<Long> values = Arrays.asList(0L, 1L, 2L);
        File outputFile = this.tmpFolder.newFile("output.avro");
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(values))).apply((PTransform)AvroIO.writeCustomType().to(this.writePipeline.newProvider((Object)outputFile.getAbsolutePath())).withFormatFunction((SerializableFunction)new CreateGenericClass()).withSchema(ReflectData.get().getSchema(GenericClass.class)).withoutSharding());
        this.writePipeline.run();
        PAssert.that((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Read", (PTransform)AvroIO.read(GenericClass.class).from(this.readPipeline.newProvider((Object)outputFile.getAbsolutePath())))).apply((PTransform)MapElements.via((SimpleFunction)new SimpleFunction<GenericClass, Long>(){

            public Long apply(GenericClass input) {
                return input.intField;
            }
        })))).containsInAnyOrder(values);
        this.readPipeline.run();
    }

    private <T extends GenericRecord> void testWriteThenReadGeneratedClass(AvroIO.Write<T> writeTransform, AvroIO.Read<T> readTransform) throws Exception {
        File outputFile = this.tmpFolder.newFile("output.avro");
        ImmutableList<AvroGeneratedUser> values = ImmutableList.of(new AvroGeneratedUser("Bob", 256, null), new AvroGeneratedUser("Alice", 128, null), new AvroGeneratedUser("Ted", null, "white"));
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(values))).apply((PTransform)writeTransform.to(this.writePipeline.newProvider((Object)outputFile.getAbsolutePath())).withoutSharding());
        this.writePipeline.run();
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply("Read", (PTransform)readTransform.from(this.readPipeline.newProvider((Object)outputFile.getAbsolutePath()))))).containsInAnyOrder(values);
        this.readPipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenReadGeneratedClassWithClass() throws Throwable {
        this.testWriteThenReadGeneratedClass(AvroIO.write(AvroGeneratedUser.class), AvroIO.read(AvroGeneratedUser.class));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenReadGeneratedClassWithSchema() throws Throwable {
        this.testWriteThenReadGeneratedClass(AvroIO.writeGenericRecords((Schema)SCHEMA), AvroIO.readGenericRecords((Schema)SCHEMA));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenReadGeneratedClassWithSchemaString() throws Throwable {
        this.testWriteThenReadGeneratedClass(AvroIO.writeGenericRecords((String)SCHEMA.toString()), AvroIO.readGenericRecords((String)SCHEMA.toString()));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteSingleFileThenReadUsingAllMethods() throws Throwable {
        ImmutableList<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File outputFile = this.tmpFolder.newFile("output.avro");
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(values))).apply((PTransform)AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
        this.writePipeline.run();
        PCollection path = (PCollection)this.readPipeline.apply("Create path", (PTransform)Create.of((Object)outputFile.getAbsolutePath(), (Object[])new String[0]));
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply("Read", (PTransform)AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))).containsInAnyOrder(values);
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply("Read withHintMatchesManyFiles", (PTransform)AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()).withHintMatchesManyFiles()))).containsInAnyOrder(values);
        PAssert.that((PCollection)((PCollection)path.apply("ReadAll", (PTransform)AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10L)))).containsInAnyOrder(values);
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply("Parse", (PTransform)AvroIO.parseGenericRecords((SerializableFunction)new ParseGenericClass()).from(outputFile.getAbsolutePath()).withCoder((Coder)AvroCoder.of(GenericClass.class))))).containsInAnyOrder(values);
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply("Parse withHintMatchesManyFiles", (PTransform)AvroIO.parseGenericRecords((SerializableFunction)new ParseGenericClass()).from(outputFile.getAbsolutePath()).withCoder((Coder)AvroCoder.of(GenericClass.class)).withHintMatchesManyFiles()))).containsInAnyOrder(values);
        PAssert.that((PCollection)((PCollection)path.apply("ParseAll", (PTransform)AvroIO.parseAllGenericRecords((SerializableFunction)new ParseGenericClass()).withCoder((Coder)AvroCoder.of(GenericClass.class)).withDesiredBundleSizeBytes(10L)))).containsInAnyOrder(values);
        this.readPipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenReadMultipleFilepatterns() throws Throwable {
        ArrayList<GenericClass> firstValues = Lists.newArrayList();
        ArrayList<GenericClass> secondValues = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            firstValues.add(new GenericClass(i, "a" + i));
            secondValues.add(new GenericClass(i, "b" + i));
        }
        ((PCollection)this.writePipeline.apply("Create first", (PTransform)Create.of(firstValues))).apply("Write first", (PTransform)AvroIO.write(GenericClass.class).to(this.tmpFolder.getRoot().getAbsolutePath() + "/first").withNumShards(2));
        ((PCollection)this.writePipeline.apply("Create second", (PTransform)Create.of(secondValues))).apply("Write second", (PTransform)AvroIO.write(GenericClass.class).to(this.tmpFolder.getRoot().getAbsolutePath() + "/second").withNumShards(3));
        this.writePipeline.run();
        PCollection paths = (PCollection)this.readPipeline.apply("Create paths", (PTransform)Create.of((Object)(this.tmpFolder.getRoot().getAbsolutePath() + "/first*"), (Object[])new String[]{this.tmpFolder.getRoot().getAbsolutePath() + "/second*"}));
        PAssert.that((PCollection)((PCollection)paths.apply("Read all", (PTransform)AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10L)))).containsInAnyOrder(Iterables.concat(firstValues, secondValues));
        PAssert.that((PCollection)((PCollection)paths.apply("Parse all", (PTransform)AvroIO.parseAllGenericRecords((SerializableFunction)new ParseGenericClass()).withCoder((Coder)AvroCoder.of(GenericClass.class)).withDesiredBundleSizeBytes(10L)))).containsInAnyOrder(Iterables.concat(firstValues, secondValues));
        this.readPipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable {
        CreateGenericClass mapFn = new CreateGenericClass();
        ArrayList<GenericClass> firstValues = Lists.newArrayList();
        ArrayList secondValues = Lists.newArrayList();
        for (int i = 0; i < 7; ++i) {
            (i < 3 ? firstValues : secondValues).add((GenericClass)mapFn.apply(i));
        }
        Window window = Window.into((WindowFn)FixedWindows.of((Duration)Duration.millis((long)100L))).withAllowedLateness(Duration.ZERO).triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)1))).discardingFiredPanes();
        ((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Sequence first", (PTransform)GenerateSequence.from((long)0L).to(3L).withRate(1L, Duration.millis((long)300L)))).apply("Window first", (PTransform)window)).apply("Map first", (PTransform)MapElements.via((SimpleFunction)mapFn))).apply("Write first", (PTransform)AvroIO.write(GenericClass.class).to(this.tmpFolder.getRoot().getAbsolutePath() + "/first").withNumShards(2).withWindowedWrites());
        ((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Sequence second", (PTransform)GenerateSequence.from((long)3L).to(7L).withRate(1L, Duration.millis((long)300L)))).apply("Window second", (PTransform)window)).apply("Map second", (PTransform)MapElements.via((SimpleFunction)mapFn))).apply("Write second", (PTransform)AvroIO.write(GenericClass.class).to(this.tmpFolder.getRoot().getAbsolutePath() + "/second").withNumShards(3).withWindowedWrites());
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply("Read", (PTransform)AvroIO.read(GenericClass.class).from(this.tmpFolder.getRoot().getAbsolutePath() + "/first*").watchForNewFiles(Duration.millis((long)100L), (Watch.Growth.TerminationCondition)Watch.Growth.afterTimeSinceNewOutput((ReadableDuration)Duration.standardSeconds((long)3L)))))).containsInAnyOrder(firstValues);
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply("Parse", (PTransform)AvroIO.parseGenericRecords((SerializableFunction)new ParseGenericClass()).from(this.tmpFolder.getRoot().getAbsolutePath() + "/first*").watchForNewFiles(Duration.millis((long)100L), (Watch.Growth.TerminationCondition)Watch.Growth.afterTimeSinceNewOutput((ReadableDuration)Duration.standardSeconds((long)3L)))))).containsInAnyOrder(firstValues);
        PCollection paths = (PCollection)this.readPipeline.apply("Create paths", (PTransform)Create.of((Object)(this.tmpFolder.getRoot().getAbsolutePath() + "/first*"), (Object[])new String[]{this.tmpFolder.getRoot().getAbsolutePath() + "/second*"}));
        PAssert.that((PCollection)((PCollection)paths.apply("Read all", (PTransform)AvroIO.readAll(GenericClass.class).watchForNewFiles(Duration.millis((long)100L), (Watch.Growth.TerminationCondition)Watch.Growth.afterTimeSinceNewOutput((ReadableDuration)Duration.standardSeconds((long)3L))).withDesiredBundleSizeBytes(10L)))).containsInAnyOrder(Iterables.concat(firstValues, secondValues));
        PAssert.that((PCollection)((PCollection)paths.apply("Parse all", (PTransform)AvroIO.parseAllGenericRecords((SerializableFunction)new ParseGenericClass()).withCoder((Coder)AvroCoder.of(GenericClass.class)).watchForNewFiles(Duration.millis((long)100L), (Watch.Growth.TerminationCondition)Watch.Growth.afterTimeSinceNewOutput((ReadableDuration)Duration.standardSeconds((long)3L))).withDesiredBundleSizeBytes(10L)))).containsInAnyOrder(Iterables.concat(firstValues, secondValues));
        this.readPipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testCompressedWriteAndReadASingleFile() throws Throwable {
        ImmutableList<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File outputFile = this.tmpFolder.newFile("output.avro");
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(values))).apply((PTransform)AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding().withCodec(CodecFactory.deflateCodec((int)9)));
        this.writePipeline.run();
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply((PTransform)AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))).containsInAnyOrder(values);
        this.readPipeline.run();
        try (DataFileStream dataFileStream = new DataFileStream((InputStream)new FileInputStream(outputFile), (DatumReader)new GenericDatumReader());){
            Assert.assertEquals((Object)"deflate", (Object)dataFileStream.getMetaString("avro.codec"));
        }
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenReadASingleFileWithNullCodec() throws Throwable {
        ImmutableList<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File outputFile = this.tmpFolder.newFile("output.avro");
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(values))).apply((PTransform)AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding().withCodec(CodecFactory.nullCodec()));
        this.writePipeline.run();
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply((PTransform)AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))).containsInAnyOrder(values);
        this.readPipeline.run();
        try (DataFileStream dataFileStream = new DataFileStream((InputStream)new FileInputStream(outputFile), (DatumReader)new GenericDatumReader());){
            Assert.assertEquals((Object)"null", (Object)dataFileStream.getMetaString("avro.codec"));
        }
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenReadSchemaUpgrade() throws Throwable {
        ImmutableList<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File outputFile = this.tmpFolder.newFile("output.avro");
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(values))).apply((PTransform)AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
        this.writePipeline.run();
        ImmutableList<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null));
        PAssert.that((PCollection)((PCollection)this.readPipeline.apply((PTransform)AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath())))).containsInAnyOrder(expected);
        this.readPipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testWriteWindowed() throws Throwable {
        this.testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_WRITE);
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testWindowedAvroIOWriteViaSink() throws Throwable {
        this.testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_SINK);
    }

    void testWindowedAvroIOWriteUsingMethod(WriteMethod method) throws IOException {
        FileIO.Write write;
        Path baseDir = Files.createTempDirectory(this.tmpFolder.getRoot().toPath(), "testwrite", new FileAttribute[0]);
        String baseFilename = baseDir.resolve("prefix").toString();
        Instant base = new Instant(0L);
        ArrayList<GenericClass> allElements = new ArrayList<GenericClass>();
        ArrayList<TimestampedValue> firstWindowElements = new ArrayList<TimestampedValue>();
        ArrayList<Instant> firstWindowTimestamps = Lists.newArrayList(base.plus((ReadableDuration)Duration.ZERO), base.plus((ReadableDuration)Duration.standardSeconds((long)10L)), base.plus((ReadableDuration)Duration.standardSeconds((long)20L)), base.plus((ReadableDuration)Duration.standardSeconds((long)30L)));
        Random random = new Random();
        for (int i = 0; i < 100; ++i) {
            GenericClass item = new GenericClass(i, String.valueOf(i));
            allElements.add(item);
            firstWindowElements.add(TimestampedValue.of((Object)item, (Instant)firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
        }
        ArrayList<TimestampedValue> secondWindowElements = new ArrayList<TimestampedValue>();
        ArrayList<Instant> secondWindowTimestamps = Lists.newArrayList(base.plus((ReadableDuration)Duration.standardSeconds((long)60L)), base.plus((ReadableDuration)Duration.standardSeconds((long)70L)), base.plus((ReadableDuration)Duration.standardSeconds((long)80L)), base.plus((ReadableDuration)Duration.standardSeconds((long)90L)));
        for (int i = 100; i < 200; ++i) {
            GenericClass item = new GenericClass(i, String.valueOf(i));
            allElements.add(new GenericClass(i, String.valueOf(i)));
            secondWindowElements.add(TimestampedValue.of((Object)item, (Instant)secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
        }
        TimestampedValue[] firstWindowArray = firstWindowElements.toArray(new TimestampedValue[100]);
        TimestampedValue[] secondWindowArray = secondWindowElements.toArray(new TimestampedValue[100]);
        TestStream values = TestStream.create((Coder)AvroCoder.of(GenericClass.class)).advanceWatermarkTo(new Instant(0L)).addElements(firstWindowArray[0], Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length)).advanceWatermarkTo(new Instant(0L).plus((ReadableDuration)Duration.standardMinutes((long)1L))).addElements(secondWindowArray[0], Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length)).advanceWatermarkToInfinity();
        switch (method) {
            case AVROIO_WRITE: {
                WindowedFilenamePolicy policy = new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible((String)baseFilename));
                write = AvroIO.write(GenericClass.class).to((FileBasedSink.FilenamePolicy)policy).withTempDirectory((ValueProvider)ValueProvider.StaticValueProvider.of((Object)FileSystems.matchNewResource((String)baseDir.toString(), (boolean)true))).withWindowedWrites().withNumShards(2).withOutputFilenames();
                break;
            }
            case AVROIO_SINK: {
                write = FileIO.write().via((FileIO.Sink)AvroIO.sink(GenericClass.class)).to(baseDir.toString()).withPrefix("prefix").withSuffix(".avro").withTempDirectory(baseDir.toString()).withNumShards(2);
                break;
            }
            default: {
                throw new UnsupportedOperationException();
            }
        }
        ((PCollection)((PCollection)this.windowedAvroWritePipeline.apply((PTransform)values)).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))))).apply((PTransform)write);
        this.windowedAvroWritePipeline.run();
        ArrayList<File> expectedFiles = new ArrayList<File>();
        for (int shard = 0; shard < 2; ++shard) {
            block16: for (int window = 0; window < 2; ++window) {
                Instant windowStart = new Instant(0L).plus((ReadableDuration)Duration.standardMinutes((long)window));
                IntervalWindow iw = new IntervalWindow(windowStart, (ReadableDuration)Duration.standardMinutes((long)1L));
                String baseAndWindow = baseFilename + "-" + iw.start() + "-" + iw.end();
                switch (method) {
                    case AVROIO_WRITE: {
                        expectedFiles.add(new File(baseAndWindow + "-" + shard + "-of-2-pane-0-last.avro"));
                        continue block16;
                    }
                    case AVROIO_SINK: {
                        expectedFiles.add(new File(baseAndWindow + "-0000" + shard + "-of-00002.avro"));
                    }
                }
            }
        }
        ArrayList actualElements = new ArrayList();
        for (File outputFile : expectedFiles) {
            Assert.assertTrue((String)("Expected output file " + outputFile.getAbsolutePath()), (boolean)outputFile.exists());
            try (DataFileReader reader = new DataFileReader(outputFile, (DatumReader)new ReflectDatumReader(ReflectData.get().getSchema(GenericClass.class)));){
                Iterators.addAll(actualElements, reader);
            }
            outputFile.delete();
        }
        Assert.assertThat(actualElements, (Matcher)Matchers.containsInAnyOrder((Object[])allElements.toArray()));
    }

    private static String schemaFromPrefix(String prefix) {
        return SCHEMA_TEMPLATE_STRING.replace("$$", prefix);
    }

    private static GenericRecord createRecord(String record, String prefix, Schema schema) {
        GenericData.Record genericRecord = new GenericData.Record(schema);
        genericRecord.put(prefix + "full", (Object)record);
        genericRecord.put(prefix + "suffix", (Object)record.substring(1));
        return genericRecord;
    }

    private void testDynamicDestinationsUnwindowedWithSharding(WriteMethod writeMethod, Sharding sharding) throws Exception {
        ResourceId baseDir = FileSystems.matchNewResource((String)Files.createTempDirectory(this.tmpFolder.getRoot().toPath(), "testDynamicDestinations", new FileAttribute[0]).toString(), (boolean)true);
        ArrayList<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
        ArrayListMultimap<String, GenericRecord> expectedElements = ArrayListMultimap.create();
        HashMap<String, String> schemaMap = Maps.newHashMap();
        for (String element2 : elements) {
            String prefix = element2.substring(0, 1);
            String jsonSchema = AvroIOTest.schemaFromPrefix(prefix);
            schemaMap.put(prefix, jsonSchema);
            expectedElements.put(prefix, AvroIOTest.createRecord(element2, prefix, new Schema.Parser().parse(jsonSchema)));
        }
        PCollectionView schemaView = (PCollectionView)((PCollection)this.writePipeline.apply("createSchemaView", (PTransform)Create.of(schemaMap))).apply((PTransform)View.asMap());
        PCollection input = (PCollection)this.writePipeline.apply("createInput", (PTransform)Create.of(elements).withCoder((Coder)StringUtf8Coder.of()));
        switch (writeMethod) {
            case AVROIO_WRITE: {
                AvroIO.TypedWrite write = AvroIO.writeCustomTypeToGenericRecords().to((DynamicAvroDestinations)new TestDynamicDestinations(baseDir, (PCollectionView<Map<String, String>>)schemaView)).withTempDirectory(baseDir);
                switch (sharding) {
                    case RUNNER_DETERMINED: {
                        break;
                    }
                    case WITHOUT_SHARDING: {
                        write = write.withoutSharding();
                        break;
                    }
                    case FIXED_3_SHARDS: {
                        write = write.withNumShards(3);
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Unknown sharding " + (Object)((Object)sharding));
                    }
                }
                input.apply((PTransform)write);
                break;
            }
            case AVROIO_SINK: {
                AvroIO.RecordFormatter & Serializable formatter = (AvroIO.RecordFormatter & Serializable)(element, schema) -> {
                    String prefix = element.substring(0, 1);
                    GenericData.Record record = new GenericData.Record(schema);
                    record.put(prefix + "full", element);
                    record.put(prefix + "suffix", (Object)element.substring(1));
                    return record;
                };
                FileIO.Write write = FileIO.writeDynamic().by(Contextful.fn((Contextful.Fn & Serializable)(element, c) -> {
                    c.sideInput(schemaView);
                    return element.substring(0, 1);
                }, (Requirements)Requirements.requiresSideInputs((PCollectionView[])new PCollectionView[]{schemaView}))).via(Contextful.fn((Contextful.Fn & Serializable)(dest, c) -> {
                    Schema schema = new Schema.Parser().parse((String)((Map)c.sideInput(schemaView)).get(dest));
                    return AvroIO.sinkViaGenericRecords((Schema)schema, (AvroIO.RecordFormatter)formatter);
                }, (Requirements)Requirements.requiresSideInputs((PCollectionView[])new PCollectionView[]{schemaView}))).to(baseDir.toString()).withNaming(Contextful.fn((Contextful.Fn & Serializable)(dest, c) -> {
                    c.sideInput(schemaView);
                    return FileIO.Write.defaultNaming((String)("file_" + dest), (String)".avro");
                }, (Requirements)Requirements.requiresSideInputs((PCollectionView[])new PCollectionView[]{schemaView}))).withTempDirectory(baseDir.toString()).withDestinationCoder((Coder)StringUtf8Coder.of()).withIgnoreWindowing();
                switch (sharding) {
                    case RUNNER_DETERMINED: {
                        break;
                    }
                    case WITHOUT_SHARDING: {
                        write = write.withNumShards(1);
                        break;
                    }
                    case FIXED_3_SHARDS: {
                        write = write.withNumShards(3);
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Unknown sharding " + (Object)((Object)sharding));
                    }
                }
                input.apply((PTransform)write);
                break;
            }
        }
        this.writePipeline.run();
        for (String prefix : expectedElements.keySet()) {
            String shardPattern;
            switch (sharding) {
                case RUNNER_DETERMINED: {
                    shardPattern = "-*";
                    break;
                }
                case WITHOUT_SHARDING: {
                    shardPattern = "-00000-of-00001";
                    break;
                }
                case FIXED_3_SHARDS: {
                    shardPattern = "-*-of-00003";
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown sharding " + (Object)((Object)sharding));
                }
            }
            String expectedFilepattern = baseDir.resolve("file_" + prefix + shardPattern + ".avro", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
            PCollection records = (PCollection)this.readPipeline.apply("read_" + prefix, (PTransform)AvroIO.readGenericRecords((String)AvroIOTest.schemaFromPrefix(prefix)).from(expectedFilepattern));
            PAssert.that((PCollection)records).containsInAnyOrder(expectedElements.get(prefix));
        }
        this.readPipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDynamicDestinationsRunnerDeterminedSharding() throws Exception {
        this.testDynamicDestinationsUnwindowedWithSharding(WriteMethod.AVROIO_WRITE, Sharding.RUNNER_DETERMINED);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDynamicDestinationsWithoutSharding() throws Exception {
        this.testDynamicDestinationsUnwindowedWithSharding(WriteMethod.AVROIO_WRITE, Sharding.WITHOUT_SHARDING);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDynamicDestinationsWithNumShards() throws Exception {
        this.testDynamicDestinationsUnwindowedWithSharding(WriteMethod.AVROIO_WRITE, Sharding.FIXED_3_SHARDS);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDynamicDestinationsViaSinkRunnerDeterminedSharding() throws Exception {
        this.testDynamicDestinationsUnwindowedWithSharding(WriteMethod.AVROIO_SINK, Sharding.RUNNER_DETERMINED);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDynamicDestinationsViaSinkWithoutSharding() throws Exception {
        this.testDynamicDestinationsUnwindowedWithSharding(WriteMethod.AVROIO_SINK, Sharding.WITHOUT_SHARDING);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDynamicDestinationsViaSinkWithNumShards() throws Exception {
        this.testDynamicDestinationsUnwindowedWithSharding(WriteMethod.AVROIO_SINK, Sharding.FIXED_3_SHARDS);
    }

    @Test
    public void testWriteWithDefaultCodec() throws Exception {
        AvroIO.Write write = AvroIO.write(String.class).to("/tmp/foo/baz");
        Assert.assertEquals((Object)CodecFactory.snappyCodec().toString(), (Object)write.inner.getCodec().toString());
    }

    @Test
    public void testWriteWithCustomCodec() throws Exception {
        AvroIO.Write write = AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec());
        Assert.assertEquals((Object)"snappy", (Object)write.inner.getCodec().toString());
    }

    @Test
    public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
        AvroIO.Write write = AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.deflateCodec((int)9));
        Assert.assertEquals((Object)CodecFactory.deflateCodec((int)9).toString(), (Object)((SerializableAvroCodecFactory)SerializableUtils.clone((Serializable)write.inner.getCodec())).getCodec().toString());
    }

    @Test
    public void testWriteWithSerDeCustomXZCodec() throws Exception {
        AvroIO.Write write = AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.xzCodec((int)9));
        Assert.assertEquals((Object)CodecFactory.xzCodec((int)9).toString(), (Object)((SerializableAvroCodecFactory)SerializableUtils.clone((Serializable)write.inner.getCodec())).getCodec().toString());
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testMetadata() throws Exception {
        ImmutableList<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File outputFile = this.tmpFolder.newFile("output.avro");
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(values))).apply((PTransform)AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding().withMetadata(ImmutableMap.of("stringKey", "stringValue", "longKey", 100L, "bytesKey", "bytesValue".getBytes(Charsets.UTF_8))));
        this.writePipeline.run();
        try (DataFileStream dataFileStream = new DataFileStream((InputStream)new FileInputStream(outputFile), (DatumReader)new GenericDatumReader());){
            Assert.assertEquals((Object)"stringValue", (Object)dataFileStream.getMetaString("stringKey"));
            Assert.assertEquals((long)100L, (long)dataFileStream.getMetaLong("longKey"));
            Assert.assertArrayEquals((byte[])"bytesValue".getBytes(Charsets.UTF_8), (byte[])dataFileStream.getMeta("bytesKey"));
        }
    }

    private void runTestWrite(String[] expectedElements, int numShards) throws IOException {
        File baseOutputFile = new File(this.tmpFolder.getRoot(), "prefix");
        String outputFilePrefix = baseOutputFile.getAbsolutePath();
        AvroIO.Write write = AvroIO.write(String.class).to(outputFilePrefix).withSuffix(".avro");
        write = numShards > 1 ? write.withNumShards(numShards) : write.withoutSharding();
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(ImmutableList.copyOf(expectedElements)))).apply((PTransform)write);
        this.writePipeline.run();
        String shardNameTemplate = MoreObjects.firstNonNull(write.inner.getShardTemplate(), "-SSSSS-of-NNNNN");
        AvroIOTest.assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
    }

    public static void assertTestOutputs(String[] expectedElements, int numShards, String outputFilePrefix, String shardNameTemplate) throws IOException {
        ArrayList<File> expectedFiles = new ArrayList<File>();
        for (int i = 0; i < numShards; ++i) {
            expectedFiles.add(new File(DefaultFilenamePolicy.constructName((ResourceId)FileBasedSink.convertToFileResourceIfPossible((String)outputFilePrefix), (String)shardNameTemplate, (String)".avro", (int)i, (int)numShards, null, null).toString()));
        }
        ArrayList actualElements = new ArrayList();
        for (File outputFile : expectedFiles) {
            Assert.assertTrue((String)("Expected output file " + outputFile.getName()), (boolean)outputFile.exists());
            try (DataFileReader reader = new DataFileReader(outputFile, (DatumReader)new ReflectDatumReader(ReflectData.get().getSchema(String.class)));){
                Iterators.addAll(actualElements, reader);
            }
        }
        Assert.assertThat(actualElements, (Matcher)Matchers.containsInAnyOrder((Object[])expectedElements));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testAvroSinkWrite() throws Exception {
        String[] expectedElements = new String[]{"first", "second", "third"};
        this.runTestWrite(expectedElements, 1);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testAvroSinkShardedWrite() throws Exception {
        String[] expectedElements = new String[]{"first", "second", "third", "fourth", "fifth"};
        this.runTestWrite(expectedElements, 4);
    }

    @Test
    public void testReadDisplayData() {
        AvroIO.Read read = AvroIO.read(String.class).from("/foo.*");
        DisplayData displayData = DisplayData.from((HasDisplayData)read);
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("filePattern", "/foo.*"));
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testPrimitiveReadDisplayData() {
        DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
        AvroIO.Read read = AvroIO.readGenericRecords((Schema)Schema.create((Schema.Type)Schema.Type.STRING)).from("/foo.*");
        Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms((PTransform<? super PBegin, ? extends POutput>)read);
        Assert.assertThat((String)"AvroIO.Read should include the file pattern in its primitive transform", displayData, (Matcher)Matchers.hasItem(DisplayDataMatchers.hasDisplayItem("filePattern")));
    }

    @Test
    public void testWriteDisplayData() {
        AvroIO.Write write = AvroIO.write(GenericClass.class).to("/foo").withShardNameTemplate("-SS-of-NN-").withSuffix("bar").withNumShards(100).withCodec(CodecFactory.deflateCodec((int)6));
        DisplayData displayData = DisplayData.from((HasDisplayData)write);
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("filePrefix", "/foo"));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("fileSuffix", "bar"));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("schema", "{\"type\":\"record\",\"name\":\"GenericClass\",\"namespace\":\"org.apache.beam.sdk.io.AvroIOTest$\",\"fields\":[{\"name\":\"intField\",\"type\":\"int\"},{\"name\":\"stringField\",\"type\":\"string\"}]}"));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("numShards", 100L));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("codec", CodecFactory.deflateCodec((int)6).toString()));
    }

    private static class TestDynamicDestinations
    extends DynamicAvroDestinations<String, String, GenericRecord> {
        ResourceId baseDir;
        PCollectionView<Map<String, String>> schemaView;

        TestDynamicDestinations(ResourceId baseDir, PCollectionView<Map<String, String>> schemaView) {
            this.baseDir = baseDir;
            this.schemaView = schemaView;
        }

        public Schema getSchema(String destination) {
            String schema = (String)((Map)this.sideInput(this.schemaView)).get(destination);
            return new Schema.Parser().parse(schema);
        }

        public List<PCollectionView<?>> getSideInputs() {
            return ImmutableList.of(this.schemaView);
        }

        public GenericRecord formatRecord(String record) {
            String prefix = record.substring(0, 1);
            return AvroIOTest.createRecord(record, prefix, this.getSchema(prefix));
        }

        public String getDestination(String element) {
            return element.substring(0, 1);
        }

        public String getDefaultDestination() {
            return "";
        }

        public FileBasedSink.FilenamePolicy getFilenamePolicy(String destination) {
            return DefaultFilenamePolicy.fromStandardParameters((ValueProvider)ValueProvider.StaticValueProvider.of((Object)this.baseDir.resolve("file_" + destination, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE)), (String)"-SSSSS-of-NNNNN", (String)".avro", (boolean)false);
        }
    }

    private static class WindowedFilenamePolicy
    extends FileBasedSink.FilenamePolicy {
        final ResourceId outputFilePrefix;

        WindowedFilenamePolicy(ResourceId outputFilePrefix) {
            this.outputFilePrefix = outputFilePrefix;
        }

        public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints outputFileHints) {
            String filenamePrefix = this.outputFilePrefix.isDirectory() ? "" : MoreObjects.firstNonNull(this.outputFilePrefix.getFilename(), "");
            IntervalWindow interval = (IntervalWindow)window;
            String windowStr = String.format("%s-%s", interval.start().toString(), interval.end().toString());
            String filename = String.format("%s-%s-%s-of-%s-pane-%s%s%s.avro", filenamePrefix, windowStr, shardNumber, numShards, paneInfo.getIndex(), paneInfo.isLast() ? "-last" : "", outputFileHints.getSuggestedFilenameSuffix());
            return this.outputFilePrefix.getCurrentDirectory().resolve(filename, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }

        public ResourceId unwindowedFilename(int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
            throw new UnsupportedOperationException("Expecting windowed outputs only");
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item((String)"fileNamePrefix", (String)this.outputFilePrefix.toString()).withLabel("File Name Prefix"));
        }
    }

    @DefaultCoder(value=AvroCoder.class)
    static class GenericClassV2 {
        int intField;
        String stringField;
        @Nullable
        String nullableField;

        public GenericClassV2() {
        }

        public GenericClassV2(int intValue, String stringValue, String nullableValue) {
            this.intField = intValue;
            this.stringField = stringValue;
            this.nullableField = nullableValue;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this.getClass()).add("intField", this.intField).add("stringField", this.stringField).add("nullableField", this.nullableField).toString();
        }

        public int hashCode() {
            return Objects.hash(this.intField, this.stringField, this.nullableField);
        }

        public boolean equals(Object other) {
            if (other == null || !(other instanceof GenericClassV2)) {
                return false;
            }
            GenericClassV2 o = (GenericClassV2)other;
            return Objects.equals(this.intField, o.intField) && Objects.equals(this.stringField, o.stringField) && Objects.equals(this.nullableField, o.nullableField);
        }
    }

    private static class CreateGenericClass
    extends SimpleFunction<Long, GenericClass> {
        private CreateGenericClass() {
        }

        public GenericClass apply(Long i) {
            return new GenericClass(i.intValue(), "value" + i);
        }
    }

    private static enum WriteMethod {
        AVROIO_WRITE,
        AVROIO_SINK;

    }

    private static enum Sharding {
        RUNNER_DETERMINED,
        WITHOUT_SHARDING,
        FIXED_3_SHARDS;

    }

    private static class ParseGenericClass
    implements SerializableFunction<GenericRecord, GenericClass> {
        private ParseGenericClass() {
        }

        public GenericClass apply(GenericRecord input) {
            return new GenericClass((Integer)input.get("intField"), input.get("stringField").toString());
        }
    }

    @DefaultCoder(value=AvroCoder.class)
    static class GenericClass {
        int intField;
        String stringField;

        public GenericClass() {
        }

        public GenericClass(int intField, String stringField) {
            this.intField = intField;
            this.stringField = stringField;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this.getClass()).add("intField", this.intField).add("stringField", this.stringField).toString();
        }

        public int hashCode() {
            return Objects.hash(this.intField, this.stringField);
        }

        public boolean equals(Object other) {
            if (other == null || !(other instanceof GenericClass)) {
                return false;
            }
            GenericClass o = (GenericClass)other;
            return Objects.equals(this.intField, o.intField) && Objects.equals(this.stringField, o.stringField);
        }
    }
}

