package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import lombok.NonNull;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.class */
public class ParquetWriteStrategy extends AbstractWriteStrategy {
    private final Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;

    public ParquetWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
        super(textFileSinkConfig);
        this.beingWrittenWriter = new HashMap();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
        if (seaTunnelRow == null) {
            throw new NullPointerException("seaTunnelRow is marked @NonNull but is null");
        }
        String orCreateFilePathBeingWritten = getOrCreateFilePathBeingWritten(seaTunnelRow);
        ParquetWriter<GenericRecord> orCreateWriter = getOrCreateWriter(orCreateFilePathBeingWritten);
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(buildSchemaWithRowType());
        this.sinkColumnsIndexInRow.forEach(num -> {
            genericRecordBuilder.set(this.seaTunnelRowType.getFieldName(num.intValue()), seaTunnelRow.getField(num.intValue()));
        });
        try {
            orCreateWriter.write(genericRecordBuilder.build());
        } catch (IOException e) {
            throw new RuntimeException(String.format("Write data to file [%s] error", orCreateFilePathBeingWritten), e);
        }
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public void finishAndCloseFile() {
        this.beingWrittenWriter.forEach((str, parquetWriter) -> {
            try {
                parquetWriter.close();
                this.needMoveFiles.put(str, getTargetLocation(str));
            } catch (IOException e) {
                throw new RuntimeException(String.format("Close file [%s] parquet writer failed, error msg: [%s]", str, e.getMessage()), e);
            }
        });
    }

    private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("filePath is marked @NonNull but is null");
        }
        ParquetWriter<GenericRecord> parquetWriter = this.beingWrittenWriter.get(str);
        if (parquetWriter != null) {
            return parquetWriter;
        }
        try {
            ParquetWriter<GenericRecord> build = AvroParquetWriter.builder(HadoopOutputFile.fromPath(new Path(str), getConfiguration(this.hadoopConf))).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withCompressionCodec(CompressionCodecName.SNAPPY).withSchema(buildSchemaWithRowType()).build();
            this.beingWrittenWriter.put(str, build);
            return build;
        } catch (IOException e) {
            throw new RuntimeException(String.format("Get parquet writer for file [%s] error", str), e);
        }
    }

    private Schema buildSchemaWithRowType() {
        ArrayList arrayList = new ArrayList();
        SeaTunnelDataType[] fieldTypes = this.seaTunnelRowType.getFieldTypes();
        String[] fieldNames = this.seaTunnelRowType.getFieldNames();
        this.sinkColumnsIndexInRow.forEach(num -> {
            if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[num.intValue()])) {
                arrayList.add(new Schema.Field(fieldNames[num.intValue()], Schema.create(Schema.Type.BOOLEAN), (String) null, (Object) null));
                return;
            }
            if (BasicType.SHORT_TYPE.equals(fieldTypes[num.intValue()]) || BasicType.INT_TYPE.equals(fieldTypes[num.intValue()])) {
                arrayList.add(new Schema.Field(fieldNames[num.intValue()], Schema.create(Schema.Type.INT), (String) null, (Object) null));
                return;
            }
            if (BasicType.LONG_TYPE.equals(fieldTypes[num.intValue()])) {
                arrayList.add(new Schema.Field(fieldNames[num.intValue()], Schema.create(Schema.Type.LONG), (String) null, (Object) null));
                return;
            }
            if (BasicType.FLOAT_TYPE.equals(fieldTypes[num.intValue()])) {
                arrayList.add(new Schema.Field(fieldNames[num.intValue()], Schema.create(Schema.Type.FLOAT), (String) null, (Object) null));
                return;
            }
            if (BasicType.DOUBLE_TYPE.equals(fieldTypes[num.intValue()])) {
                arrayList.add(new Schema.Field(fieldNames[num.intValue()], Schema.create(Schema.Type.DOUBLE), (String) null, (Object) null));
                return;
            }
            if (BasicType.STRING_TYPE.equals(fieldTypes[num.intValue()])) {
                arrayList.add(new Schema.Field(fieldNames[num.intValue()], Schema.create(Schema.Type.STRING), (String) null, (Object) null));
            } else if (BasicType.BYTE_TYPE.equals(fieldTypes[num.intValue()])) {
                arrayList.add(new Schema.Field(fieldNames[num.intValue()], Schema.create(Schema.Type.BYTES), (String) null, (Object) null));
            } else if (BasicType.VOID_TYPE.equals(fieldTypes[num.intValue()])) {
                arrayList.add(new Schema.Field(fieldNames[num.intValue()], Schema.create(Schema.Type.NULL), (String) null, (Object) null));
            }
        });
        return Schema.createRecord("SeatunnelRecord", "The record generated by seatunnel file connector", "org.apache.parquet.avro", false, arrayList);
    }
}
