package org.apache.flink.formats.avro;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.datagen.source.TestDataGenerators;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.legacy.StreamingFileSink;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.class */
public class AvroStreamingFileSinkITCase extends AbstractTestBaseJUnit4 {

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(20);

    /* loaded from: input_file:org/apache/flink/formats/avro/AvroStreamingFileSinkITCase$Datum.class */
    public static class Datum implements Serializable {
        public String a;
        public int b;

        public Datum() {
        }

        public Datum(String str, int i) {
            this.a = str;
            this.b = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Datum datum = (Datum) obj;
            return this.b == datum.b && (this.a == null ? datum.a == null : this.a.equals(datum.a));
        }

        public int hashCode() {
            return (31 * (this.a != null ? this.a.hashCode() : 0)) + this.b;
        }
    }

    /* loaded from: input_file:org/apache/flink/formats/avro/AvroStreamingFileSinkITCase$GenericTestDataCollection.class */
    private static class GenericTestDataCollection extends AbstractCollection<GenericRecord> implements Serializable {
        private GenericTestDataCollection() {
        }

        @Override // java.util.AbstractCollection, java.util.Collection, java.lang.Iterable
        public Iterator<GenericRecord> iterator() {
            GenericRecord record = new GenericData.Record(Address.getClassSchema());
            record.put(0, 1);
            record.put(1, "a");
            record.put(2, "b");
            record.put(3, "c");
            record.put(4, "12345");
            GenericRecord record2 = new GenericData.Record(Address.getClassSchema());
            record2.put(0, 2);
            record2.put(1, "x");
            record2.put(2, "y");
            record2.put(3, "z");
            record2.put(4, "98765");
            return Arrays.asList(record, record2).iterator();
        }

        @Override // java.util.AbstractCollection, java.util.Collection
        public int size() {
            return 2;
        }
    }

    @Test
    public void testWriteAvroSpecific() throws Exception {
        File newFolder = TEMPORARY_FOLDER.newFolder();
        List asList = Arrays.asList(new Address(1, "a", "b", "c", "12345"), new Address(2, "p", "q", "r", "12345"), new Address(3, "x", "y", "z", "12345"));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.fromSource(TestDataGenerators.fromDataWithSnapshotsLatch(asList, TypeInformation.of(Address.class)), WatermarkStrategy.noWatermarks(), "Test Source").addSink(StreamingFileSink.forBulkFormat(Path.fromLocalFile(newFolder), AvroWriters.forSpecificRecord(Address.class)).withBucketAssigner(new UniqueBucketAssigner("test")).build());
        executionEnvironment.execute();
        validateResults(newFolder, new SpecificDatumReader(Address.class), asList);
    }

    @Test
    public void testWriteAvroGeneric() throws Exception {
        File newFolder = TEMPORARY_FOLDER.newFolder();
        Schema classSchema = Address.getClassSchema();
        GenericTestDataCollection genericTestDataCollection = new GenericTestDataCollection();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.fromSource(TestDataGenerators.fromDataWithSnapshotsLatch(genericTestDataCollection, new GenericRecordAvroTypeInfo(classSchema)), WatermarkStrategy.noWatermarks(), "Test Source").addSink(StreamingFileSink.forBulkFormat(Path.fromLocalFile(newFolder), AvroWriters.forGenericRecord(classSchema)).withBucketAssigner(new UniqueBucketAssigner("test")).build());
        executionEnvironment.execute();
        validateResults(newFolder, new GenericDatumReader(classSchema), new ArrayList(genericTestDataCollection));
    }

    @Test
    public void testWriteAvroReflect() throws Exception {
        File newFolder = TEMPORARY_FOLDER.newFolder();
        List asList = Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.fromSource(TestDataGenerators.fromDataWithSnapshotsLatch(asList, TypeInformation.of(Datum.class)), WatermarkStrategy.noWatermarks(), "Test Source").addSink(StreamingFileSink.forBulkFormat(Path.fromLocalFile(newFolder), AvroWriters.forReflectRecord(Datum.class)).withBucketAssigner(new UniqueBucketAssigner("test")).build());
        executionEnvironment.execute();
        validateResults(newFolder, new ReflectDatumReader(Datum.class), asList);
    }

    private static <T> void validateResults(File file, DatumReader<T> datumReader, List<T> list) throws Exception {
        File[] listFiles = file.listFiles();
        Assertions.assertThat(listFiles).hasSize(1);
        File[] listFiles2 = listFiles[0].listFiles();
        Assertions.assertThat(listFiles2).hasSize(2);
        for (File file2 : listFiles2) {
            Assertions.assertThat(file2).isNotEmpty();
            Assertions.assertThat(readAvroFile(file2, datumReader)).isEqualTo(list);
        }
    }

    private static <T> List<T> readAvroFile(File file, DatumReader<T> datumReader) throws IOException {
        ArrayList arrayList = new ArrayList();
        DataFileReader dataFileReader = new DataFileReader(file, datumReader);
        while (dataFileReader.hasNext()) {
            try {
                arrayList.add(dataFileReader.next());
            } catch (Throwable th) {
                try {
                    dataFileReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        dataFileReader.close();
        return arrayList;
    }
}
