package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import io.airlift.compress.lzo.LzopCodec;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.class */
public class TextReadStrategy extends AbstractReadStrategy {
    private static final Logger log = LoggerFactory.getLogger(TextReadStrategy.class);
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private String fieldDelimiter = (String) BaseSourceConfigOptions.FIELD_DELIMITER.defaultValue();
    private DateUtils.Formatter dateFormat = (DateUtils.Formatter) BaseSourceConfigOptions.DATE_FORMAT.defaultValue();
    private DateTimeUtils.Formatter datetimeFormat = (DateTimeUtils.Formatter) BaseSourceConfigOptions.DATETIME_FORMAT.defaultValue();
    private TimeUtils.Formatter timeFormat = (TimeUtils.Formatter) BaseSourceConfigOptions.TIME_FORMAT.defaultValue();
    private CompressFormat compressFormat = (CompressFormat) BaseSourceConfigOptions.COMPRESS_CODEC.defaultValue();
    private int[] indexes;

    @Override // org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy
    public void read(String str, String str2, Collector<SeaTunnelRow> collector) throws FileConnectorException, IOException {
        CompressionInputStream inputStream;
        Map<String, String> parsePartitionsByPath = parsePartitionsByPath(str);
        switch (this.compressFormat) {
            case LZO:
                inputStream = new LzopCodec().createInputStream(this.hadoopFileSystemProxy.getInputStream(str));
                break;
            case NONE:
                inputStream = this.hadoopFileSystemProxy.getInputStream(str);
                break;
            default:
                log.warn("Text file does not support this compress type: {}", this.compressFormat.getCompressCodec());
                inputStream = this.hadoopFileSystemProxy.getInputStream(str);
                break;
        }
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader((InputStream) inputStream, StandardCharsets.UTF_8));
        Throwable th = null;
        try {
            try {
                bufferedReader.lines().skip(this.skipHeaderNumber).forEach(str3 -> {
                    try {
                        SeaTunnelRow seaTunnelRow = (SeaTunnelRow) this.deserializationSchema.deserialize(str3.getBytes());
                        if (!this.readColumns.isEmpty()) {
                            Object[] objArr = this.isMergePartition ? new Object[this.readColumns.size() + parsePartitionsByPath.size()] : new Object[this.readColumns.size()];
                            for (int i = 0; i < this.indexes.length; i++) {
                                objArr[i] = seaTunnelRow.getField(this.indexes[i]);
                            }
                            seaTunnelRow = new SeaTunnelRow(objArr);
                        }
                        if (this.isMergePartition) {
                            int totalFields = this.seaTunnelRowType.getTotalFields();
                            Iterator it = parsePartitionsByPath.values().iterator();
                            while (it.hasNext()) {
                                int i2 = totalFields;
                                totalFields++;
                                seaTunnelRow.setField(i2, (String) it.next());
                            }
                        }
                        seaTunnelRow.setTableId(str2);
                        collector.collect(seaTunnelRow);
                    } catch (IOException e) {
                        throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, String.format("Deserialize this data [%s] failed, please check the origin data", str3), e);
                    }
                });
                if (bufferedReader != null) {
                    if (0 == 0) {
                        bufferedReader.close();
                        return;
                    }
                    try {
                        bufferedReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (bufferedReader != null) {
                if (th != null) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy
    public SeaTunnelRowType getSeaTunnelRowTypeInfo(String str) {
        this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
        this.seaTunnelRowTypeWithPartition = mergePartitionTypes(this.fileNames.get(0), this.seaTunnelRowType);
        initFormatter();
        if (this.pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {
            throw new FileConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "When reading json/text/csv files, if user has not specified schema information, SeaTunnel will not support column projection");
        }
        TextDeserializationSchema.Builder timeFormatter = TextDeserializationSchema.builder().delimiter(TextFormatConstant.PLACEHOLDER).dateFormatter(this.dateFormat).dateTimeFormatter(this.datetimeFormat).timeFormatter(this.timeFormat);
        if (this.isMergePartition) {
            this.deserializationSchema = timeFormatter.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();
        } else {
            this.deserializationSchema = timeFormatter.seaTunnelRowType(this.seaTunnelRowType).build();
        }
        return getActualSeaTunnelRowTypeInfo();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy, org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy
    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
        SeaTunnelRowType mergePartitionTypes = mergePartitionTypes(this.fileNames.get(0), seaTunnelRowType);
        Optional optional = ReadonlyConfig.fromConfig(this.pluginConfig).getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);
        if (optional.isPresent()) {
            this.fieldDelimiter = (String) optional.get();
        } else if (FileFormat.valueOf(this.pluginConfig.getString(BaseSourceConfigOptions.FILE_FORMAT_TYPE.key()).toUpperCase()) == FileFormat.CSV) {
            this.fieldDelimiter = ",";
        }
        initFormatter();
        TextDeserializationSchema.Builder timeFormatter = TextDeserializationSchema.builder().delimiter(this.fieldDelimiter).dateFormatter(this.dateFormat).dateTimeFormatter(this.datetimeFormat).timeFormatter(this.timeFormat);
        if (this.isMergePartition) {
            this.deserializationSchema = timeFormatter.seaTunnelRowType(mergePartitionTypes).build();
        } else {
            this.deserializationSchema = timeFormatter.seaTunnelRowType(seaTunnelRowType).build();
        }
        if (!this.pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {
            this.seaTunnelRowType = seaTunnelRowType;
            this.seaTunnelRowTypeWithPartition = mergePartitionTypes;
            return;
        }
        this.indexes = new int[this.readColumns.size()];
        String[] strArr = new String[this.readColumns.size()];
        SeaTunnelDataType[] seaTunnelDataTypeArr = new SeaTunnelDataType[this.readColumns.size()];
        for (int i = 0; i < this.indexes.length; i++) {
            this.indexes[i] = seaTunnelRowType.indexOf(this.readColumns.get(i));
            strArr[i] = seaTunnelRowType.getFieldName(this.indexes[i]);
            seaTunnelDataTypeArr[i] = seaTunnelRowType.getFieldType(this.indexes[i]);
        }
        this.seaTunnelRowType = new SeaTunnelRowType(strArr, seaTunnelDataTypeArr);
        this.seaTunnelRowTypeWithPartition = mergePartitionTypes(this.fileNames.get(0), this.seaTunnelRowType);
    }

    private void initFormatter() {
        if (this.pluginConfig.hasPath(BaseSourceConfigOptions.DATE_FORMAT.key())) {
            this.dateFormat = DateUtils.Formatter.parse(this.pluginConfig.getString(BaseSourceConfigOptions.DATE_FORMAT.key()));
        }
        if (this.pluginConfig.hasPath(BaseSourceConfigOptions.DATETIME_FORMAT.key())) {
            this.datetimeFormat = DateTimeUtils.Formatter.parse(this.pluginConfig.getString(BaseSourceConfigOptions.DATETIME_FORMAT.key()));
        }
        if (this.pluginConfig.hasPath(BaseSourceConfigOptions.TIME_FORMAT.key())) {
            this.timeFormat = TimeUtils.Formatter.parse(this.pluginConfig.getString(BaseSourceConfigOptions.TIME_FORMAT.key()));
        }
        if (this.pluginConfig.hasPath(BaseSourceConfigOptions.COMPRESS_CODEC.key())) {
            this.compressFormat = CompressFormat.valueOf(this.pluginConfig.getString(BaseSourceConfigOptions.COMPRESS_CODEC.key()).toUpperCase());
        }
    }
}
