/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.formats.avro;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
import org.apache.flink.connector.testframe.external.ExternalContextFactory;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroBulkFormatTestUtils;
import org.apache.flink.formats.avro.RowDataToAvroConverters;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.FileUtils;

class AvroBulkFormatITCase
extends SourceTestSuiteBase<RowData> {
    private static final RowDataSerializer SERIALIZER = new RowDataSerializer(AvroBulkFormatTestUtils.ROW_TYPE);
    @TestSemantics
    CheckpointingMode[] semantics = new CheckpointingMode[]{CheckpointingMode.EXACTLY_ONCE};
    @TestEnv
    MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
    @TestContext
    AvroBulkFormatExternalContext.Factory oneBlockPerFile = new AvroBulkFormatExternalContext.Factory(1);
    @TestContext
    AvroBulkFormatExternalContext.Factory twoBlocksPerFile = new AvroBulkFormatExternalContext.Factory(2);

    AvroBulkFormatITCase() {
    }

    private static class AvroBulkFormatExternalSystemSplitDataWriter
    implements ExternalSystemSplitDataWriter<RowData> {
        private final Schema schema = AvroSchemaConverter.convertToSchema((LogicalType)AvroBulkFormatTestUtils.ROW_TYPE);
        private final RowDataToAvroConverters.RowDataToAvroConverter converter = RowDataToAvroConverters.createConverter((LogicalType)AvroBulkFormatTestUtils.ROW_TYPE);
        private final DataFileWriter<GenericRecord> dataFileWriter;

        private AvroBulkFormatExternalSystemSplitDataWriter(FileOutputStream out) throws IOException {
            GenericDatumWriter datumWriter = new GenericDatumWriter(this.schema);
            this.dataFileWriter = new DataFileWriter((DatumWriter)datumWriter);
            this.dataFileWriter.create(this.schema, (OutputStream)out);
            this.dataFileWriter.setSyncInterval(128);
        }

        public void writeRecords(List<RowData> records) {
            for (RowData rowData : records) {
                try {
                    this.dataFileWriter.append((Object)((GenericRecord)this.converter.convert(this.schema, (Object)rowData)));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        public void close() throws Exception {
            this.dataFileWriter.close();
        }
    }

    public static class AvroBulkFormatExternalContext
    implements DataStreamSourceExternalContext<RowData> {
        private final java.nio.file.Path tmpDir;
        private final int blocksPerFile;
        private int index;

        private AvroBulkFormatExternalContext(int blocksPerFile) {
            try {
                this.tmpDir = Files.createTempDirectory("avro-bulk-format-it-case", new FileAttribute[0]);
            }
            catch (IOException e) {
                throw new RuntimeException("Encountered exception when creating temp directory for tests", e);
            }
            this.blocksPerFile = blocksPerFile;
            this.index = 0;
        }

        public Source<RowData, ?, ?> createSource(TestingSourceSettings sourceSettings) {
            if (sourceSettings.getBoundedness() == Boundedness.CONTINUOUS_UNBOUNDED) {
                throw new UnsupportedOperationException("Currently Avro format only supports running in bounded mode");
            }
            AvroBulkFormatTestUtils.TestingAvroBulkFormat format = new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
            return FileSource.forBulkFileFormat((BulkFormat)format, (Path[])new Path[]{Path.fromLocalFile((File)this.tmpDir.toFile())}).build();
        }

        public ExternalSystemSplitDataWriter<RowData> createSourceSplitDataWriter(TestingSourceSettings sourceSettings) {
            AvroBulkFormatExternalSystemSplitDataWriter writer;
            File file = Paths.get(this.tmpDir.toString(), String.valueOf(this.index)).toFile();
            try {
                file.createNewFile();
                writer = new AvroBulkFormatExternalSystemSplitDataWriter(new FileOutputStream(file));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            ++this.index;
            return writer;
        }

        public List<RowData> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed) {
            Random random = new Random(seed);
            ArrayList<RowData> data = new ArrayList<RowData>();
            for (int i = 0; i < this.blocksPerFile; ++i) {
                data.add(this.getBinaryRow(this.randomString(4, random), this.randomString(8, random)));
                data.add(this.getBinaryRow(this.randomString(16, random), null));
                data.add(this.getBinaryRow(this.randomString(32, random), this.randomString(1024, random)));
            }
            return data;
        }

        public TypeInformation<RowData> getProducedType() {
            return TypeInformation.of(RowData.class);
        }

        public void close() throws Exception {
            FileUtils.deleteDirectory((File)this.tmpDir.toFile());
        }

        private StringData randomString(int len, Random random) {
            StringBuilder builder = new StringBuilder();
            for (int i = 0; i < len; ++i) {
                builder.append((char)(random.nextInt(26) + 97));
            }
            return StringData.fromString((String)builder.toString());
        }

        private RowData getBinaryRow(Object ... values) {
            GenericRowData genericRowData = new GenericRowData(values.length);
            for (int i = 0; i < values.length; ++i) {
                genericRowData.setField(i, values[i]);
            }
            BinaryRowData binaryRowData = SERIALIZER.toBinaryRow((RowData)genericRowData);
            return SERIALIZER.copy((RowData)binaryRowData);
        }

        public List<URL> getConnectorJarPaths() {
            return Collections.emptyList();
        }

        public String toString() {
            return "AvroBulkFormatExternalContext{blocksPerFile=" + this.blocksPerFile + '}';
        }

        public static class Factory
        implements ExternalContextFactory<AvroBulkFormatExternalContext> {
            private final int blocksPerFile;

            public Factory(int blocksPerFile) {
                this.blocksPerFile = blocksPerFile;
            }

            public AvroBulkFormatExternalContext createExternalContext(String testName) {
                return new AvroBulkFormatExternalContext(this.blocksPerFile);
            }
        }
    }
}

