package org.apache.beam.sdk.io;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Iterators;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/AvroIOTest.class */
public class AvroIOTest {

    @Rule
    public TestPipeline p = TestPipeline.create();

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Rule
    public TestPipeline windowedAvroWritePipeline = TestPipeline.create();

    @DefaultCoder(AvroCoder.class)
    /* loaded from: input_file:org/apache/beam/sdk/io/AvroIOTest$GenericClass.class */
    static class GenericClass {
        int intField;
        String stringField;

        public GenericClass() {
        }

        public GenericClass(int i, String str) {
            this.intField = i;
            this.stringField = str;
        }

        public String toString() {
            return MoreObjects.toStringHelper(getClass()).add("intField", this.intField).add("stringField", this.stringField).toString();
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.intField), this.stringField);
        }

        public boolean equals(Object obj) {
            if (obj == null || !(obj instanceof GenericClass)) {
                return false;
            }
            GenericClass genericClass = (GenericClass) obj;
            return Objects.equals(Integer.valueOf(this.intField), Integer.valueOf(genericClass.intField)) && Objects.equals(this.stringField, genericClass.stringField);
        }
    }

    @DefaultCoder(AvroCoder.class)
    /* loaded from: input_file:org/apache/beam/sdk/io/AvroIOTest$GenericClassV2.class */
    static class GenericClassV2 {
        int intField;
        String stringField;

        @Nullable
        String nullableField;

        public GenericClassV2() {
        }

        public GenericClassV2(int i, String str, String str2) {
            this.intField = i;
            this.stringField = str;
            this.nullableField = str2;
        }

        public String toString() {
            return MoreObjects.toStringHelper(getClass()).add("intField", this.intField).add("stringField", this.stringField).add("nullableField", this.nullableField).toString();
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.intField), this.stringField, this.nullableField);
        }

        public boolean equals(Object obj) {
            if (obj == null || !(obj instanceof GenericClassV2)) {
                return false;
            }
            GenericClassV2 genericClassV2 = (GenericClassV2) obj;
            return Objects.equals(Integer.valueOf(this.intField), Integer.valueOf(genericClassV2.intField)) && Objects.equals(this.stringField, genericClassV2.stringField) && Objects.equals(this.nullableField, genericClassV2.nullableField);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/AvroIOTest$WindowedFilenamePolicy.class */
    private static class WindowedFilenamePolicy extends FileBasedSink.FilenamePolicy {
        final String outputFilePrefix;

        WindowedFilenamePolicy(String str) {
            this.outputFilePrefix = str;
        }

        public ResourceId windowedFilename(ResourceId resourceId, FileBasedSink.FilenamePolicy.WindowedContext windowedContext, String str) {
            Object[] objArr = new Object[7];
            objArr[0] = this.outputFilePrefix;
            objArr[1] = windowedContext.getWindow();
            objArr[2] = Integer.valueOf(windowedContext.getShardNumber());
            objArr[3] = Integer.valueOf(windowedContext.getNumShards() - 1);
            objArr[4] = Long.valueOf(windowedContext.getPaneInfo().getIndex());
            objArr[5] = windowedContext.getPaneInfo().isLast() ? "-final" : "";
            objArr[6] = str;
            return resourceId.resolve(String.format("%s-%s-%s-of-%s-pane-%s%s%s", objArr), ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }

        public ResourceId unwindowedFilename(ResourceId resourceId, FileBasedSink.FilenamePolicy.Context context, String str) {
            throw new UnsupportedOperationException("Expecting windowed outputs only");
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item("fileNamePrefix", this.outputFilePrefix).withLabel("File Name Prefix"));
        }
    }

    @Test
    public void testAvroIOGetName() {
        Assert.assertEquals("AvroIO.Read", AvroIO.read(String.class).from("/tmp/foo*/baz").getName());
        Assert.assertEquals("AvroIO.Write", AvroIO.write(String.class).to("/tmp/foo/baz").getName());
    }

    @Test
    @Category({NeedsRunner.class})
    public void testAvroIOWriteAndReadASingleFile() throws Throwable {
        ImmutableList of = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File newFile = this.tmpFolder.newFile("output.avro");
        this.p.apply(Create.of(of)).apply(AvroIO.write(GenericClass.class).to(newFile.getAbsolutePath()).withoutSharding());
        this.p.run();
        PAssert.that(this.p.apply(AvroIO.read(GenericClass.class).from(newFile.getAbsolutePath()))).containsInAnyOrder(of);
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable {
        ImmutableList of = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File newFile = this.tmpFolder.newFile("output.avro");
        this.p.apply(Create.of(of)).apply(AvroIO.write(GenericClass.class).to(newFile.getAbsolutePath()).withoutSharding().withCodec(CodecFactory.deflateCodec(9)));
        this.p.run();
        PAssert.that(this.p.apply(AvroIO.read(GenericClass.class).from(newFile.getAbsolutePath()))).containsInAnyOrder(of);
        this.p.run();
        Assert.assertEquals("deflate", new DataFileStream(new FileInputStream(newFile), new GenericDatumReader()).getMetaString("avro.codec"));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable {
        ImmutableList of = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File newFile = this.tmpFolder.newFile("output.avro");
        this.p.apply(Create.of(of)).apply(AvroIO.write(GenericClass.class).to(newFile.getAbsolutePath()).withoutSharding().withCodec(CodecFactory.nullCodec()));
        this.p.run();
        PAssert.that(this.p.apply(AvroIO.read(GenericClass.class).from(newFile.getAbsolutePath()))).containsInAnyOrder(of);
        this.p.run();
        Assert.assertEquals("null", new DataFileStream(new FileInputStream(newFile), new GenericDatumReader()).getMetaString("avro.codec"));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {
        ImmutableList of = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File newFile = this.tmpFolder.newFile("output.avro");
        this.p.apply(Create.of(of)).apply(AvroIO.write(GenericClass.class).to(newFile.getAbsolutePath()).withoutSharding());
        this.p.run();
        PAssert.that(this.p.apply(AvroIO.read(GenericClassV2.class).from(newFile.getAbsolutePath()))).containsInAnyOrder(ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null)));
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesTestStream.class})
    public void testWindowedAvroIOWrite() throws Throwable {
        String path = Files.createTempDirectory(this.tmpFolder.getRoot().toPath(), "testwrite", new FileAttribute[0]).resolve("prefix").toString();
        Instant instant = new Instant(0L);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList newArrayList = Lists.newArrayList(new Instant[]{instant.plus(Duration.standardSeconds(0L)), instant.plus(Duration.standardSeconds(10L)), instant.plus(Duration.standardSeconds(20L)), instant.plus(Duration.standardSeconds(30L))});
        Random random = new Random();
        for (int i = 0; i < 100; i++) {
            GenericClass genericClass = new GenericClass(i, String.valueOf(i));
            arrayList.add(genericClass);
            arrayList2.add(TimestampedValue.of(genericClass, (Instant) newArrayList.get(random.nextInt(newArrayList.size()))));
        }
        ArrayList arrayList3 = new ArrayList();
        ArrayList newArrayList2 = Lists.newArrayList(new Instant[]{instant.plus(Duration.standardSeconds(60L)), instant.plus(Duration.standardSeconds(70L)), instant.plus(Duration.standardSeconds(80L)), instant.plus(Duration.standardSeconds(90L))});
        for (int i2 = 100; i2 < 200; i2++) {
            GenericClass genericClass2 = new GenericClass(i2, String.valueOf(i2));
            arrayList.add(new GenericClass(i2, String.valueOf(i2)));
            arrayList3.add(TimestampedValue.of(genericClass2, (Instant) newArrayList2.get(random.nextInt(newArrayList2.size()))));
        }
        TimestampedValue[] timestampedValueArr = (TimestampedValue[]) arrayList2.toArray(new TimestampedValue[100]);
        TimestampedValue[] timestampedValueArr2 = (TimestampedValue[]) arrayList3.toArray(new TimestampedValue[100]);
        this.windowedAvroWritePipeline.apply(TestStream.create(AvroCoder.of(GenericClass.class)).advanceWatermarkTo(new Instant(0L)).addElements(timestampedValueArr[0], (TimestampedValue[]) Arrays.copyOfRange(timestampedValueArr, 1, timestampedValueArr.length)).advanceWatermarkTo(new Instant(0L).plus(Duration.standardMinutes(1L))).addElements(timestampedValueArr2[0], (TimestampedValue[]) Arrays.copyOfRange(timestampedValueArr2, 1, timestampedValueArr2.length)).advanceWatermarkToInfinity()).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L)))).apply(AvroIO.write(GenericClass.class).to(path).withFilenamePolicy(new WindowedFilenamePolicy(path)).withWindowedWrites().withNumShards(2));
        this.windowedAvroWritePipeline.run();
        ArrayList<File> arrayList4 = new ArrayList();
        for (int i3 = 0; i3 < 2; i3++) {
            for (int i4 = 0; i4 < 2; i4++) {
                arrayList4.add(new File(path + "-" + new IntervalWindow(new Instant(0L).plus(Duration.standardMinutes(i4)), Duration.standardMinutes(1L)).toString() + "-" + i3 + "-of-1-pane-0-final"));
            }
        }
        ArrayList arrayList5 = new ArrayList();
        for (File file : arrayList4) {
            Assert.assertTrue("Expected output file " + file.getAbsolutePath(), file.exists());
            DataFileReader dataFileReader = new DataFileReader(file, new ReflectDatumReader(ReflectData.get().getSchema(GenericClass.class)));
            Throwable th = null;
            try {
                try {
                    Iterators.addAll(arrayList5, dataFileReader);
                    if (dataFileReader != null) {
                        if (0 != 0) {
                            try {
                                dataFileReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataFileReader.close();
                        }
                    }
                    file.delete();
                } finally {
                }
            } catch (Throwable th3) {
                if (dataFileReader != null) {
                    if (th != null) {
                        try {
                            dataFileReader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        dataFileReader.close();
                    }
                }
                throw th3;
            }
        }
        Assert.assertThat(arrayList5, Matchers.containsInAnyOrder(arrayList.toArray()));
    }

    @Test
    public void testWriteWithDefaultCodec() throws Exception {
        Assert.assertEquals(CodecFactory.deflateCodec(6).toString(), AvroIO.write(String.class).to("/tmp/foo/baz").getCodec().toString());
    }

    @Test
    public void testWriteWithCustomCodec() throws Exception {
        Assert.assertEquals("snappy", AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec()).getCodec().toString());
    }

    @Test
    public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
        Assert.assertEquals(CodecFactory.deflateCodec(9).toString(), SerializableUtils.clone(AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.deflateCodec(9)).getCodec()).getCodec().toString());
    }

    @Test
    public void testWriteWithSerDeCustomXZCodec() throws Exception {
        Assert.assertEquals(CodecFactory.xzCodec(9).toString(), SerializableUtils.clone(AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.xzCodec(9)).getCodec()).getCodec().toString());
    }

    @Test
    @Category({NeedsRunner.class})
    public void testMetadata() throws Exception {
        ImmutableList of = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
        File newFile = this.tmpFolder.newFile("output.avro");
        this.p.apply(Create.of(of)).apply(AvroIO.write(GenericClass.class).to(newFile.getAbsolutePath()).withoutSharding().withMetadata(ImmutableMap.of("stringKey", "stringValue", "longKey", 100L, "bytesKey", "bytesValue".getBytes())));
        this.p.run();
        DataFileStream dataFileStream = new DataFileStream(new FileInputStream(newFile), new GenericDatumReader());
        Assert.assertEquals("stringValue", dataFileStream.getMetaString("stringKey"));
        Assert.assertEquals(100L, dataFileStream.getMetaLong("longKey"));
        Assert.assertArrayEquals("bytesValue".getBytes(), dataFileStream.getMeta("bytesKey"));
    }

    private void runTestWrite(String[] strArr, int i) throws IOException {
        AvroIO.Write withoutSharding;
        String absolutePath = new File(this.tmpFolder.getRoot(), "prefix").getAbsolutePath();
        AvroIO.Write write = AvroIO.write(String.class).to(absolutePath);
        if (i > 1) {
            System.out.println("NumShards " + i);
            withoutSharding = write.withNumShards(i);
        } else {
            System.out.println("no sharding");
            withoutSharding = write.withoutSharding();
        }
        this.p.apply(Create.of(ImmutableList.copyOf(strArr))).apply(withoutSharding);
        this.p.run();
        assertTestOutputs(strArr, i, absolutePath, (String) MoreObjects.firstNonNull(withoutSharding.getShardTemplate(), "-SSSSS-of-NNNNN"));
    }

    public static void assertTestOutputs(String[] strArr, int i, String str, String str2) throws IOException {
        ArrayList<File> arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new File(DefaultFilenamePolicy.constructName(str, str2, "", i2, i)));
        }
        ArrayList arrayList2 = new ArrayList();
        for (File file : arrayList) {
            Assert.assertTrue("Expected output file " + file.getName(), file.exists());
            DataFileReader dataFileReader = new DataFileReader(file, new ReflectDatumReader(ReflectData.get().getSchema(String.class)));
            Throwable th = null;
            try {
                try {
                    Iterators.addAll(arrayList2, dataFileReader);
                    if (dataFileReader != null) {
                        if (0 != 0) {
                            try {
                                dataFileReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataFileReader.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (dataFileReader != null) {
                    if (th != null) {
                        try {
                            dataFileReader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        dataFileReader.close();
                    }
                }
                throw th3;
            }
        }
        Assert.assertThat(arrayList2, Matchers.containsInAnyOrder(strArr));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testAvroSinkWrite() throws Exception {
        runTestWrite(new String[]{"first", "second", "third"}, 1);
    }

    @Test
    @Category({NeedsRunner.class})
    public void testAvroSinkShardedWrite() throws Exception {
        runTestWrite(new String[]{"first", "second", "third", "fourth", "fifth"}, 4);
    }

    @Test
    public void testReadDisplayData() {
        Assert.assertThat(DisplayData.from(AvroIO.read(String.class).from("/foo.*")), DisplayDataMatchers.hasDisplayItem("filePattern", "/foo.*"));
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testPrimitiveReadDisplayData() {
        Assert.assertThat("AvroIO.Read should include the file pattern in its primitive transform", DisplayDataEvaluator.create().displayDataForPrimitiveSourceTransforms(AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("/foo.*")), Matchers.hasItem(DisplayDataMatchers.hasDisplayItem("filePattern")));
    }

    @Test
    public void testWriteDisplayData() {
        DisplayData from = DisplayData.from(AvroIO.write(GenericClass.class).to("/foo").withShardNameTemplate("-SS-of-NN-").withSuffix("bar").withNumShards(100).withCodec(CodecFactory.snappyCodec()));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("filePrefix", "/foo"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("fileSuffix", "bar"));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("schema", (Class<?>) GenericClass.class));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("numShards", 100L));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
    }

    @Test
    public void testWindowedWriteRequiresFilenamePolicy() {
        PCollection apply = this.p.apply(Create.empty(StringUtf8Coder.of()));
        AvroIO.Write withWindowedWrites = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites();
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("When using windowed writes, a filename policy must be set via withFilenamePolicy()");
        apply.apply(withWindowedWrites);
    }
}
