package org.apache.paimon.hive.mapred;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.hive.utils.HiveUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/hive/mapred/PaimonOutputCommitter.class */
public class PaimonOutputCommitter extends OutputCommitter {
    private static final String PRE_COMMIT = ".preCommit";
    private static final Logger LOG = LoggerFactory.getLogger(PaimonOutputCommitter.class);

    public void setupJob(JobContext jobContext) throws IOException {
    }

    public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
    }

    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
        return TaskType.REDUCE.equals(taskAttemptContext.getTaskAttemptID().getTaskID().getTaskType()) || taskAttemptContext.getJobConf().getNumReduceTasks() == 0;
    }

    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        TaskAttemptID taskAttemptID = taskAttemptContext.getTaskAttemptID();
        FileStoreTable createFileStoreTable = HiveUtils.createFileStoreTable(taskAttemptContext.getJobConf());
        PaimonRecordWriter paimonRecordWriter = (PaimonRecordWriter) ((Map) Optional.ofNullable(PaimonRecordWriter.getWriters(taskAttemptID)).orElseGet(() -> {
            LOG.info("CommitTask found no writers for output table: {}, attemptID: {}", createFileStoreTable.name(), taskAttemptID);
            return ImmutableMap.of();
        })).get(createFileStoreTable.name());
        if (paimonRecordWriter != null) {
            try {
                BatchTableWrite batchTableWrite = paimonRecordWriter.batchTableWrite();
                Throwable th = null;
                try {
                    try {
                        createPreCommitFile(batchTableWrite.prepareCommit(), generatePreCommitFileLocation(createFileStoreTable.location().getPath(), taskAttemptID.getJobID(), taskAttemptID.getTaskID().getId()), createFileStoreTable.fileIO());
                        paimonRecordWriter.close(true);
                        if (batchTableWrite != null) {
                            if (0 != 0) {
                                try {
                                    batchTableWrite.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                batchTableWrite.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                LOG.error("CommitTask prepareCommit error for specific table: {}, attemptID: {}", createFileStoreTable.name(), taskAttemptID);
                throw new RuntimeException(e);
            }
        } else {
            LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", createFileStoreTable.name(), taskAttemptID);
        }
        PaimonRecordWriter.removeWriters(taskAttemptID);
    }

    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
        Map<String, PaimonRecordWriter> removeWriters = PaimonRecordWriter.removeWriters(taskAttemptContext.getTaskAttemptID());
        if (removeWriters != null) {
            Iterator<PaimonRecordWriter> it = removeWriters.values().iterator();
            while (it.hasNext()) {
                it.next().close(true);
            }
        }
    }

    public void commitJob(JobContext jobContext) throws IOException {
        JobConf jobConf = jobContext.getJobConf();
        long currentTimeMillis = System.currentTimeMillis();
        LOG.info("CommitJob {} has started", jobContext.getJobID());
        FileStoreTable createFileStoreTable = HiveUtils.createFileStoreTable(jobConf);
        if (createFileStoreTable != null) {
            BatchWriteBuilder newBatchWriteBuilder = createFileStoreTable.newBatchWriteBuilder();
            List<CommitMessage> allPreCommitMessage = getAllPreCommitMessage(createFileStoreTable.location().getPath(), jobContext, createFileStoreTable.fileIO());
            try {
                BatchTableCommit newCommit = newBatchWriteBuilder.newCommit();
                Throwable th = null;
                try {
                    try {
                        newCommit.commit(allPreCommitMessage);
                        if (newCommit != null) {
                            if (0 != 0) {
                                try {
                                    newCommit.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                newCommit.close();
                            }
                        }
                        deleteTemporaryFile(jobContext, generateJobLocation(createFileStoreTable.location().getPath(), jobContext.getJobID()));
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            LOG.info("CommitJob not found table, Skipping job commit.");
        }
        LOG.info("Commit took {} ms for job {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), jobContext.getJobID());
    }

    public void abortJob(JobContext jobContext, int i) throws IOException {
        FileStoreTable createFileStoreTable = HiveUtils.createFileStoreTable(jobContext.getJobConf());
        if (createFileStoreTable != null) {
            LOG.info("AbortJob {} has started", jobContext.getJobID());
            List<CommitMessage> allPreCommitMessage = getAllPreCommitMessage(createFileStoreTable.location().getPath(), jobContext, createFileStoreTable.fileIO());
            try {
                BatchTableCommit newCommit = createFileStoreTable.newBatchWriteBuilder().newCommit();
                Throwable th = null;
                try {
                    try {
                        newCommit.abort(allPreCommitMessage);
                        if (newCommit != null) {
                            if (0 != 0) {
                                try {
                                    newCommit.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                newCommit.close();
                            }
                        }
                        deleteTemporaryFile(jobContext, generateJobLocation(createFileStoreTable.location().getPath(), jobContext.getJobID()));
                        LOG.info("Job {} is aborted. preCommit file has deleted", jobContext.getJobID());
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void deleteTemporaryFile(JobContext jobContext, String str) throws IOException {
        JobConf jobConf = jobContext.getJobConf();
        LOG.info("Deleting temporary file for job {} started", jobContext.getJobID());
        LOG.info("The deleted file is located in : {}", str);
        try {
            Path path = new Path(str);
            path.getFileSystem(jobConf).delete(path, true);
        } catch (IOException e) {
            LOG.debug("Failed to delete directory {} ", str, e);
        }
        LOG.info("Deleting temporary file for job {} finished", jobContext.getJobID());
    }

    private static List<CommitMessage> getAllPreCommitMessage(String str, JobContext jobContext, FileIO fileIO) {
        JobConf jobConf = jobContext.getJobConf();
        int numReduceTasks = jobConf.getNumReduceTasks() > 0 ? jobConf.getNumReduceTasks() : jobConf.getNumMapTasks();
        List<CommitMessage> synchronizedList = Collections.synchronizedList(new ArrayList());
        for (int i = 0; i < numReduceTasks; i++) {
            synchronizedList.addAll(readPreCommitFile(generatePreCommitFileLocation(str, jobContext.getJobID(), i), fileIO));
        }
        return synchronizedList;
    }

    static String generateJobLocation(String str, JobID jobID) {
        return str + "/temp/" + jobID;
    }

    private static String generatePreCommitFileLocation(String str, JobID jobID, int i) {
        return generateJobLocation(str, jobID) + "/task_" + i + PRE_COMMIT;
    }

    private static void createPreCommitFile(List<CommitMessage> list, String str, FileIO fileIO) throws IOException {
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileIO.newOutputStream(new org.apache.paimon.fs.Path(str), true));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(list);
                if (objectOutputStream != null) {
                    if (0 == 0) {
                        objectOutputStream.close();
                        return;
                    }
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th4;
        }
    }

    private static List<CommitMessage> readPreCommitFile(String str, FileIO fileIO) {
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(fileIO.newInputStream(new org.apache.paimon.fs.Path(str)));
            Throwable th = null;
            try {
                try {
                    List<CommitMessage> list = (List) objectInputStream.readObject();
                    if (objectInputStream != null) {
                        if (0 != 0) {
                            try {
                                objectInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            objectInputStream.close();
                        }
                    }
                    return list;
                } finally {
                }
            } catch (Throwable th3) {
                if (objectInputStream != null) {
                    if (th != null) {
                        try {
                            objectInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        objectInputStream.close();
                    }
                }
                throw th3;
            }
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(String.format("Can not read or parse CommitMessage file: %s", str));
        }
    }
}
