package org.apache.beam.sdk.io;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.AvroSchemaIOProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/AvroSchemaIOProviderTest.class */
public class AvroSchemaIOProviderTest {

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static final Schema SCHEMA = Schema.builder().addInt64Field("age").addStringField("age_str").build();

    private Row createRow(long j) {
        return Row.withSchema(SCHEMA).addValues(Long.valueOf(j), Long.valueOf(j).toString()).build();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteAndReadTable() {
        File file = new File(this.tempFolder.getRoot(), "person-info.avro");
        AvroSchemaIOProvider avroSchemaIOProvider = new AvroSchemaIOProvider();
        AvroSchemaIOProvider.AvroSchemaIO from = avroSchemaIOProvider.from(file.getAbsolutePath(), Row.withSchema(avroSchemaIOProvider.configurationSchema()).addValue(null).build(), SCHEMA);
        List asList = Arrays.asList(createRow(1L), createRow(3L), createRow(4L));
        ((PCollection) this.writePipeline.apply("Create", Create.of(asList).withCoder(RowCoder.of(SCHEMA)))).apply(from.buildWriter());
        this.writePipeline.run();
        PAssert.that((PCollection) this.readPipeline.begin().apply(from.buildReader())).containsInAnyOrder(asList);
        this.readPipeline.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testStreamingWriteDefault() throws Exception {
        File file = new File(this.tempFolder.getRoot(), "person-info");
        AvroSchemaIOProvider avroSchemaIOProvider = new AvroSchemaIOProvider();
        Row build = Row.withSchema(avroSchemaIOProvider.configurationSchema()).addValue(null).build();
        ((PCollection) this.writePipeline.apply("create", TestStream.create(RowCoder.of(SCHEMA)).addElements(TimestampedValue.of(createRow(1L), new Instant(1L)), new TimestampedValue[0]).addElements(TimestampedValue.of(createRow(2L), Instant.ofEpochSecond(120L)), new TimestampedValue[0]).advanceWatermarkToInfinity())).apply("write", avroSchemaIOProvider.from(file.getAbsolutePath(), build, SCHEMA).buildWriter());
        this.writePipeline.run();
        String str = file.getAbsolutePath() + "*";
        Assert.assertEquals(2L, FileSystems.match(str).metadata().size());
        PAssert.that((PCollection) this.readPipeline.begin().apply("read", avroSchemaIOProvider.from(str, build, SCHEMA).buildReader())).containsInAnyOrder(createRow(1L), createRow(2L));
        this.readPipeline.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testStreamingCustomWindowSize() throws Exception {
        File file = new File(this.tempFolder.getRoot(), "person-info");
        AvroSchemaIOProvider avroSchemaIOProvider = new AvroSchemaIOProvider();
        Row build = Row.withSchema(avroSchemaIOProvider.configurationSchema()).addValue(Long.valueOf(Duration.ofMinutes(4L).getSeconds())).build();
        ((PCollection) this.writePipeline.apply("create", TestStream.create(RowCoder.of(SCHEMA)).addElements(TimestampedValue.of(createRow(1L), new Instant(1L)), new TimestampedValue[0]).addElements(TimestampedValue.of(createRow(2L), Instant.ofEpochSecond(120L)), new TimestampedValue[0]).advanceWatermarkToInfinity())).apply("write", avroSchemaIOProvider.from(file.getAbsolutePath(), build, SCHEMA).buildWriter());
        this.writePipeline.run();
        String str = file.getAbsolutePath() + "*";
        Assert.assertEquals(1L, FileSystems.match(str).metadata().size());
        PAssert.that((PCollection) this.readPipeline.begin().apply("read", avroSchemaIOProvider.from(str, build, SCHEMA).buildReader())).containsInAnyOrder(createRow(1L), createRow(2L));
        this.readPipeline.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testBatchCustomWindowSize() throws Exception {
        File file = new File(this.tempFolder.getRoot(), "person-info");
        AvroSchemaIOProvider avroSchemaIOProvider = new AvroSchemaIOProvider();
        Row build = Row.withSchema(avroSchemaIOProvider.configurationSchema()).addValue(Long.valueOf(Duration.ofMinutes(4L).getSeconds())).build();
        AvroSchemaIOProvider.AvroSchemaIO from = avroSchemaIOProvider.from(file.getAbsolutePath(), build, SCHEMA);
        List asList = Arrays.asList(createRow(1L), createRow(3L), createRow(4L));
        ((PCollection) this.writePipeline.apply("Create", Create.of(asList).withCoder(RowCoder.of(SCHEMA)))).apply("write", from.buildWriter());
        this.writePipeline.run();
        String str = file.getAbsolutePath() + "*";
        Assert.assertEquals(1L, FileSystems.match(str).metadata().size());
        PAssert.that((PCollection) this.readPipeline.begin().apply("read", avroSchemaIOProvider.from(str, build, SCHEMA).buildReader())).containsInAnyOrder(asList);
        this.readPipeline.run();
    }
}
