package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.poi.openxml4j.opc.PackagingURIHelper;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.VariablesSubstitute;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.class */
public abstract class AbstractWriteStrategy implements WriteStrategy {
    protected final FileSinkConfig fileSinkConfig;
    protected final CompressFormat compressFormat;
    protected final List<Integer> sinkColumnsIndexInRow;
    protected String jobId;
    protected int subTaskIndex;
    protected HadoopConf hadoopConf;
    protected HadoopFileSystemProxy hadoopFileSystemProxy;
    protected String transactionId;
    protected String uuidPrefix;
    protected String transactionDirectory;
    protected LinkedHashMap<String, String> needMoveFiles;
    private LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
    protected SeaTunnelRowType seaTunnelRowType;
    protected int batchSize;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected LinkedHashMap<String, String> beingWrittenFile = new LinkedHashMap<>();
    protected Long checkpointId = 0L;
    protected int partId = 0;
    protected int currentBatchSize = 0;

    public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) {
        this.fileSinkConfig = fileSinkConfig;
        this.sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow();
        this.batchSize = fileSinkConfig.getBatchSize();
        this.compressFormat = fileSinkConfig.getCompressFormat();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public void init(HadoopConf hadoopConf, String str, String str2, int i) {
        this.hadoopConf = hadoopConf;
        this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
        this.jobId = str;
        this.subTaskIndex = i;
        this.uuidPrefix = str2;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException {
        if (this.currentBatchSize >= this.batchSize) {
            newFilePart();
            this.currentBatchSize = 0;
        }
        this.currentBatchSize++;
    }

    public synchronized void newFilePart() {
        this.partId++;
        this.beingWrittenFile.clear();
        this.log.debug("new file part: {}", Integer.valueOf(this.partId));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SeaTunnelRowType buildSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, List<Integer> list) {
        SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes();
        String[] fieldNames = seaTunnelRowType.getFieldNames();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        list.forEach(num -> {
            arrayList.add(fieldNames[num.intValue()]);
            arrayList2.add(fieldTypes[num.intValue()]);
        });
        return new SeaTunnelRowType((String[]) arrayList.toArray(new String[0]), (SeaTunnelDataType[]) arrayList2.toArray(new SeaTunnelDataType[0]));
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public Configuration getConfiguration(HadoopConf hadoopConf) {
        Configuration configuration = hadoopConf.toConfiguration();
        this.hadoopConf.setExtraOptionsForConfiguration(configuration);
        return configuration;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
        this.seaTunnelRowType = seaTunnelRowType;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public LinkedHashMap<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
        String substitute;
        List<Integer> partitionFieldsIndexInRow = this.fileSinkConfig.getPartitionFieldsIndexInRow();
        LinkedHashMap<String, List<String>> linkedHashMap = new LinkedHashMap<>(1);
        if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
            linkedHashMap.put(BaseSinkConfig.NON_PARTITION, null);
            return linkedHashMap;
        }
        List<String> partitionFieldList = this.fileSinkConfig.getPartitionFieldList();
        String partitionDirExpression = this.fileSinkConfig.getPartitionDirExpression();
        String[] strArr = new String[partitionFieldList.size()];
        String[] strArr2 = new String[partitionFieldList.size()];
        for (int i = 0; i < partitionFieldList.size(); i++) {
            strArr[i] = "k" + i;
            strArr2[i] = "v" + i;
        }
        ArrayList arrayList = new ArrayList(partitionFieldsIndexInRow.size());
        if (StringUtils.isBlank(partitionDirExpression)) {
            StringBuilder sb = new StringBuilder();
            for (int i2 = 0; i2 < partitionFieldsIndexInRow.size(); i2++) {
                sb.append(partitionFieldList.get(i2)).append("=").append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i2).intValue()]);
                if (i2 < partitionFieldsIndexInRow.size() - 1) {
                    sb.append(PackagingURIHelper.FORWARD_SLASH_STRING);
                }
                arrayList.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i2).intValue()].toString());
            }
            substitute = sb.toString();
        } else {
            HashMap hashMap = new HashMap(partitionFieldList.size() * 2);
            for (int i3 = 0; i3 < partitionFieldsIndexInRow.size(); i3++) {
                hashMap.put(strArr[i3], partitionFieldList.get(i3));
                hashMap.put(strArr2[i3], seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i3).intValue()].toString());
                arrayList.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i3).intValue()].toString());
            }
            substitute = VariablesSubstitute.substitute(partitionDirExpression, hashMap);
        }
        linkedHashMap.put(substitute, arrayList);
        return linkedHashMap;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public String generateFileName(String str) {
        String fileNameExpression = this.fileSinkConfig.getFileNameExpression();
        String str2 = this.compressFormat.getCompressCodec() + this.fileSinkConfig.getFileFormat().getSuffix();
        if (StringUtils.isBlank(fileNameExpression)) {
            return str + str2;
        }
        String fileNameTimeFormat = this.fileSinkConfig.getFileNameTimeFormat();
        String format = DateTimeFormatter.ofPattern(fileNameTimeFormat).format(ZonedDateTime.now());
        HashMap hashMap = new HashMap();
        hashMap.put("uuid", UUID.randomUUID().toString());
        hashMap.put("now", format);
        hashMap.put(fileNameTimeFormat, format);
        hashMap.put(BaseSinkConfig.TRANSACTION_EXPRESSION, str);
        return (VariablesSubstitute.substitute(fileNameExpression, hashMap) + BaseSinkConfig.TRANSACTION_ID_SPLIT + this.partId) + str2;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.Transaction
    public Optional<FileCommitInfo> prepareCommit() {
        finishAndCloseFile();
        return Optional.of(new FileCommitInfo(new LinkedHashMap(this.needMoveFiles), (LinkedHashMap) this.partitionDirAndValuesMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new ArrayList((Collection) entry.getValue());
        }, (list, list2) -> {
            return list;
        }, LinkedHashMap::new)), this.transactionDirectory));
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.Transaction
    public void abortPrepare() {
        abortPrepare(this.transactionId);
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.Transaction
    public void abortPrepare(String str) {
        try {
            this.hadoopFileSystemProxy.deleteFile(getTransactionDir(str));
        } catch (IOException e) {
            throw new FileConnectorException(CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "Abort transaction " + str + " error, delete transaction directory failed", e);
        }
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.Transaction
    public void beginTransaction(Long l) {
        this.checkpointId = l;
        this.transactionId = getTransactionId(l);
        this.transactionDirectory = getTransactionDir(this.transactionId);
        this.needMoveFiles = new LinkedHashMap<>();
        this.partitionDirAndValuesMap = new LinkedHashMap<>();
    }

    private String getTransactionId(Long l) {
        return "T_" + this.jobId + BaseSinkConfig.TRANSACTION_ID_SPLIT + this.uuidPrefix + BaseSinkConfig.TRANSACTION_ID_SPLIT + this.subTaskIndex + BaseSinkConfig.TRANSACTION_ID_SPLIT + l;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.Transaction
    public List<FileSinkState> snapshotState(long j) {
        ArrayList newArrayList = Lists.newArrayList(new FileSinkState[]{new FileSinkState(this.transactionId, this.uuidPrefix, this.checkpointId, new LinkedHashMap(this.needMoveFiles), (LinkedHashMap) this.partitionDirAndValuesMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new ArrayList((Collection) entry.getValue());
        }, (list, list2) -> {
            return list;
        }, LinkedHashMap::new)), getTransactionDir(this.transactionId))});
        this.beingWrittenFile.clear();
        beginTransaction(Long.valueOf(j + 1));
        return newArrayList;
    }

    private String getTransactionDir(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("transactionId is marked non-null but is null");
        }
        return String.join(File.separator, getTransactionDirPrefix(this.fileSinkConfig.getTmpPath(), this.jobId, this.uuidPrefix), str);
    }

    public static String getTransactionDirPrefix(String str, String str2, String str3) {
        return String.join(File.separator, str, BaseSinkConfig.SEATUNNEL, str2, str3);
    }

    public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow seaTunnelRow) {
        if (seaTunnelRow == null) {
            throw new NullPointerException("seaTunnelRow is marked non-null but is null");
        }
        LinkedHashMap<String, List<String>> generatorPartitionDir = generatorPartitionDir(seaTunnelRow);
        String obj = generatorPartitionDir.keySet().toArray()[0].toString();
        String str = this.beingWrittenFile.get(obj);
        if (str != null) {
            return str;
        }
        String join = String.join(File.separator, this.transactionDirectory, obj, generateFileName(this.transactionId));
        this.beingWrittenFile.put(obj, join);
        if (!BaseSinkConfig.NON_PARTITION.equals(generatorPartitionDir.keySet().toArray()[0].toString())) {
            this.partitionDirAndValuesMap.putAll(generatorPartitionDir);
        }
        return join;
    }

    public String getTargetLocation(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("seaTunnelFilePath is marked non-null but is null");
        }
        return str.replaceAll(Matcher.quoteReplacement(this.transactionDirectory), Matcher.quoteReplacement(this.fileSinkConfig.getPath())).replaceAll(BaseSinkConfig.NON_PARTITION + Matcher.quoteReplacement(File.separator), "");
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public long getCheckpointId() {
        return this.checkpointId.longValue();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public FileSinkConfig getFileSinkConfig() {
        return this.fileSinkConfig;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy
    public HadoopFileSystemProxy getHadoopFileSystemProxy() {
        return this.hadoopFileSystemProxy;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (this.hadoopFileSystemProxy != null) {
                this.hadoopFileSystemProxy.close();
            }
        } catch (Exception e) {
        }
    }
}
