package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
import org.apache.iceberg.mr.hive.writer.WriterRegistry;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.class */
public class HiveIcebergOutputCommitter extends OutputCommitter {
    private static final String FOR_COMMIT_EXTENSION = ".forCommit";
    private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter$OutputTable.class */
    public static class OutputTable {
        private final String catalogName;
        private final String tableName;
        private final Table table;
        private final JobContext jobContext;
        private final SessionStateUtil.CommitInfo commitInfo;

        private OutputTable(String str, String str2, Table table, JobContext jobContext, SessionStateUtil.CommitInfo commitInfo) {
            this.catalogName = str;
            this.tableName = str2;
            this.table = table;
            this.jobContext = jobContext;
            this.commitInfo = commitInfo;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            OutputTable outputTable = (OutputTable) obj;
            return Objects.equals(this.tableName, outputTable.tableName) && Objects.equals(this.jobContext.getJobID(), outputTable.jobContext.getJobID());
        }

        public int hashCode() {
            return Objects.hash(this.tableName, this.jobContext.getJobID());
        }

        public Optional<SessionStateUtil.CommitInfo> getCommitInfo() {
            return Optional.ofNullable(this.commitInfo);
        }
    }

    public void setupJob(JobContext jobContext) {
    }

    public void setupTask(TaskAttemptContext taskAttemptContext) {
    }

    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
        return TaskType.REDUCE.equals(taskAttemptContext.getTaskAttemptID().getTaskID().getTaskType()) || taskAttemptContext.getJobConf().getNumReduceTasks() == 0;
    }

    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        TaskAttemptContext enrichContextWithAttemptWrapper = TezUtil.enrichContextWithAttemptWrapper(taskAttemptContext);
        TaskAttemptID taskAttemptID = enrichContextWithAttemptWrapper.getTaskAttemptID();
        JobConf jobConf = enrichContextWithAttemptWrapper.getJobConf();
        Set<String> outputTables = HiveIcebergStorageHandler.outputTables(enrichContextWithAttemptWrapper.getJobConf());
        Map map = (Map) Optional.ofNullable(WriterRegistry.writers(taskAttemptID)).orElseGet(() -> {
            LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputTables, taskAttemptID);
            return ImmutableMap.of();
        });
        ExecutorService tableExecutor = tableExecutor(jobConf, outputTables.size());
        try {
            Tasks.foreach(outputTables).retry(3).stopOnFailure().throwFailureWhenFinished().executeWith(tableExecutor).run(str -> {
                Table table = HiveIcebergStorageHandler.table(enrichContextWithAttemptWrapper.getJobConf(), str);
                if (table == null) {
                    LOG.info("CommitTask found no serialized table in config for table: {}.", str);
                    return;
                }
                String generateFileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, taskAttemptID.getJobID(), taskAttemptID.getTaskID().getId());
                if (map.get(str) == null) {
                    LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", str, taskAttemptID);
                    createFileForCommit(FilesForCommit.empty(), generateFileForCommitLocation, table.io());
                    return;
                }
                ArrayList newArrayList = Lists.newArrayList();
                ArrayList newArrayList2 = Lists.newArrayList();
                Iterator it = ((List) map.get(str)).iterator();
                while (it.hasNext()) {
                    FilesForCommit files = ((HiveIcebergWriter) it.next()).files();
                    newArrayList.addAll(files.dataFiles());
                    newArrayList2.addAll(files.deleteFiles());
                }
                createFileForCommit(new FilesForCommit(newArrayList, newArrayList2), generateFileForCommitLocation, table.io());
            }, IOException.class);
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            WriterRegistry.removeWriters(taskAttemptID);
        } catch (Throwable th) {
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
        Map<String, List<HiveIcebergWriter>> removeWriters = WriterRegistry.removeWriters(TezUtil.enrichContextWithAttemptWrapper(taskAttemptContext).getTaskAttemptID());
        if (removeWriters != null) {
            Iterator<List<HiveIcebergWriter>> it = removeWriters.values().iterator();
            while (it.hasNext()) {
                Iterator<HiveIcebergWriter> it2 = it.next().iterator();
                while (it2.hasNext()) {
                    it2.next().close(true);
                }
            }
        }
    }

    public void commitJob(JobContext jobContext) throws IOException {
        commitJobs(Collections.singletonList(jobContext));
    }

    public void commitJobs(List<JobContext> list) throws IOException {
        List<JobContext> list2 = (List) list.stream().map(TezUtil::enrichContextWithVertexId).collect(Collectors.toList());
        Set<OutputTable> collectOutputs = collectOutputs(list2);
        long currentTimeMillis = System.currentTimeMillis();
        String str = (String) list2.stream().map(jobContext -> {
            return jobContext.getJobID().toString();
        }).collect(Collectors.joining(","));
        LOG.info("Committing job(s) {} has started", str);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ExecutorService fileExecutor = fileExecutor(list2.get(0).getJobConf());
        ExecutorService tableExecutor = tableExecutor(list2.get(0).getJobConf(), collectOutputs.size());
        try {
            Tasks.foreach(collectOutputs).throwFailureWhenFinished().stopOnFailure().executeWith(tableExecutor).run(outputTable -> {
                JobConf jobConf = outputTable.jobContext.getJobConf();
                Table table = outputTable.table;
                concurrentLinkedQueue.add(generateJobLocation(table.location(), jobConf, outputTable.jobContext.getJobID()));
                commitTable(table.io(), fileExecutor, outputTable);
            });
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            LOG.info("Commit took {} ms for job(s) {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), str);
            Iterator<JobContext> it = list2.iterator();
            while (it.hasNext()) {
                cleanup(it.next(), concurrentLinkedQueue);
            }
        } catch (Throwable th) {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    private Set<OutputTable> collectOutputs(List<JobContext> list) {
        HashSet newHashSet = Sets.newHashSet();
        for (JobContext jobContext : list) {
            for (String str : HiveIcebergStorageHandler.outputTables(jobContext.getJobConf())) {
                Table table = (Table) SessionStateUtil.getResource(jobContext.getJobConf(), str).filter(obj -> {
                    return obj instanceof Table;
                }).map(obj2 -> {
                    return (Table) obj2;
                }).orElseGet(() -> {
                    return HiveIcebergStorageHandler.table(jobContext.getJobConf(), str);
                });
                if (table == null) {
                    LOG.info("CommitJob found no table object in QueryState or conf for: {}. Skipping job commit.", str);
                } else {
                    newHashSet.add(new OutputTable(HiveIcebergStorageHandler.catalogName(jobContext.getJobConf(), str), str, table, jobContext, SessionStateUtil.getCommitInfo(jobContext.getJobConf(), str).isPresent() ? (SessionStateUtil.CommitInfo) ((Map) SessionStateUtil.getCommitInfo(jobContext.getJobConf(), str).get()).get(jobContext.getJobID().toString()) : null));
                }
            }
        }
        return newHashSet;
    }

    public void abortJob(JobContext jobContext, int i) throws IOException {
        abortJobs(Collections.singletonList(jobContext));
    }

    public void abortJobs(List<JobContext> list) throws IOException {
        List<JobContext> list2 = (List) list.stream().map(TezUtil::enrichContextWithVertexId).collect(Collectors.toList());
        Set<OutputTable> collectOutputs = collectOutputs(list2);
        String str = (String) list2.stream().map(jobContext -> {
            return jobContext.getJobID().toString();
        }).collect(Collectors.joining(","));
        LOG.info("Job(s) {} are aborted. Data file cleaning started", str);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ExecutorService fileExecutor = fileExecutor(list2.get(0).getJobConf());
        ExecutorService tableExecutor = tableExecutor(list2.get(0).getJobConf(), collectOutputs.size());
        try {
            Tasks.foreach(collectOutputs).suppressFailureWhenFinished().executeWith(tableExecutor).onFailure((outputTable, exc) -> {
                LOG.warn("Failed cleanup table {} on abort job", outputTable, exc);
            }).run(outputTable2 -> {
                JobContext jobContext2 = outputTable2.jobContext;
                JobConf jobConf = jobContext2.getJobConf();
                LOG.info("Cleaning job for jobID: {}, table: {}", jobContext2.getJobID(), outputTable2);
                Table table = outputTable2.table;
                String generateJobLocation = generateJobLocation(table.location(), jobConf, jobContext2.getJobID());
                concurrentLinkedQueue.add(generateJobLocation);
                FilesForCommit collectResults = collectResults(listForCommits(jobConf, generateJobLocation).size(), fileExecutor, table.location(), jobContext2, table.io(), false);
                Collection collection = (Collection) Stream.concat(collectResults.dataFiles().stream(), collectResults.deleteFiles().stream()).collect(Collectors.toList());
                if (collection.size() > 0) {
                    Tasks.foreach(collection).retry(3).suppressFailureWhenFinished().executeWith(fileExecutor).onFailure((contentFile, exc2) -> {
                        LOG.warn("Failed to remove data file {} on abort job", contentFile.path(), exc2);
                    }).run(contentFile2 -> {
                        table.io().deleteFile(contentFile2.path().toString());
                    });
                }
            }, IOException.class);
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            LOG.info("Job(s) {} are aborted. Data file cleaning finished", str);
            Iterator<JobContext> it = list2.iterator();
            while (it.hasNext()) {
                cleanup(it.next(), concurrentLinkedQueue);
            }
        } catch (Throwable th) {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    private Set<FileStatus> listForCommits(JobConf jobConf, String str) throws IOException {
        Path path = new Path(str);
        LOG.debug("Listing job location to get forCommits for abort: {}", str);
        FileStatus[] listStatus = path.getFileSystem(jobConf).listStatus(path);
        LOG.debug("Listing the job location: {} yielded these files: {}", str, Arrays.toString(listStatus));
        return (Set) Arrays.stream(listStatus).filter(fileStatus -> {
            return !fileStatus.isDirectory() && fileStatus.getPath().getName().endsWith(FOR_COMMIT_EXTENSION);
        }).collect(Collectors.toSet());
    }

    private void commitTable(FileIO fileIO, ExecutorService executorService, OutputTable outputTable) {
        String str = outputTable.tableName;
        JobContext jobContext = outputTable.jobContext;
        JobConf jobConf = jobContext.getJobConf();
        Properties properties = new Properties();
        properties.put(Catalogs.NAME, str);
        properties.put(Catalogs.LOCATION, outputTable.table.location());
        if (outputTable.catalogName != null) {
            properties.put(InputFormatConfig.CATALOG_NAME, outputTable.catalogName);
        }
        Table loadTable = Catalogs.loadTable(jobConf, properties);
        long currentTimeMillis = System.currentTimeMillis();
        LOG.info("Committing job has started for table: {}, using location: {}", loadTable, generateJobLocation(outputTable.table.location(), jobConf, jobContext.getJobID()));
        FilesForCommit collectResults = collectResults(((Integer) outputTable.getCommitInfo().map((v0) -> {
            return v0.getTaskNum();
        }).orElseGet(() -> {
            LOG.info("Number of tasks not available in session state for jobID: {}, table: {}. Falling back to jobConf numReduceTasks/numMapTasks", jobContext.getJobID(), str);
            return Integer.valueOf(jobConf.getNumReduceTasks() > 0 ? jobConf.getNumReduceTasks() : jobConf.getNumMapTasks());
        })).intValue(), executorService, outputTable.table.location(), jobContext, fileIO, true);
        String str2 = jobConf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
        if (jobConf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
            commitOverwrite(loadTable, str2, currentTimeMillis, collectResults);
        } else if (collectResults.isEmpty()) {
            LOG.info("Not creating a new commit for table: {}, jobID: {}, operation: {}, since there were no new files to add", new Object[]{loadTable, jobContext.getJobID(), HiveCustomStorageHandlerUtils.getWriteOperation(jobConf, str)});
        } else {
            commitWrite(loadTable, str2, currentTimeMillis, collectResults);
        }
    }

    private void commitWrite(Table table, String str, long j, FilesForCommit filesForCommit) {
        if (filesForCommit.deleteFiles().isEmpty()) {
            AppendFiles newAppend = table.newAppend();
            Collection<DataFile> dataFiles = filesForCommit.dataFiles();
            newAppend.getClass();
            dataFiles.forEach(newAppend::appendFile);
            if (StringUtils.isNotEmpty(str)) {
                newAppend.toBranch2(HiveUtils.getTableSnapshotRef(str));
            }
            newAppend.commit();
        } else {
            RowDelta newRowDelta = table.newRowDelta();
            Collection<DataFile> dataFiles2 = filesForCommit.dataFiles();
            newRowDelta.getClass();
            dataFiles2.forEach(newRowDelta::addRows);
            Collection<DeleteFile> deleteFiles = filesForCommit.deleteFiles();
            newRowDelta.getClass();
            deleteFiles.forEach(newRowDelta::addDeletes);
            if (StringUtils.isNotEmpty(str)) {
                newRowDelta.toBranch2(HiveUtils.getTableSnapshotRef(str));
            }
            newRowDelta.commit();
        }
        LOG.info("Write commit took {} ms for table: {} with {} data and {} delete file(s)", new Object[]{Long.valueOf(System.currentTimeMillis() - j), table, Integer.valueOf(filesForCommit.dataFiles().size()), Integer.valueOf(filesForCommit.deleteFiles().size())});
        LOG.debug("Added files {}", filesForCommit);
    }

    private void commitOverwrite(Table table, String str, long j, FilesForCommit filesForCommit) {
        Preconditions.checkArgument(filesForCommit.deleteFiles().isEmpty(), "Can not handle deletes with overwrite");
        if (!filesForCommit.dataFiles().isEmpty()) {
            ReplacePartitions newReplacePartitions = table.newReplacePartitions();
            Collection<DataFile> dataFiles = filesForCommit.dataFiles();
            newReplacePartitions.getClass();
            dataFiles.forEach(newReplacePartitions::addFile);
            if (StringUtils.isNotEmpty(str)) {
                newReplacePartitions.toBranch2(HiveUtils.getTableSnapshotRef(str));
            }
            newReplacePartitions.commit();
            LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", new Object[]{Long.valueOf(System.currentTimeMillis() - j), table, Integer.valueOf(filesForCommit.dataFiles().size())});
        } else if (table.spec().isUnpartitioned()) {
            DeleteFiles newDelete = table.newDelete();
            newDelete.deleteFromRowFilter(Expressions.alwaysTrue());
            if (StringUtils.isNotEmpty(str)) {
                newDelete.toBranch2(HiveUtils.getTableSnapshotRef(str));
            }
            newDelete.commit();
            LOG.info("Cleared table contents as part of empty overwrite for unpartitioned table. Commit took {} ms for table: {}", Long.valueOf(System.currentTimeMillis() - j), table);
        }
        LOG.debug("Overwrote partitions with files {}", filesForCommit);
    }

    private void cleanup(JobContext jobContext, Collection<String> collection) throws IOException {
        JobConf jobConf = jobContext.getJobConf();
        LOG.info("Cleaning for job {} started", jobContext.getJobID());
        Tasks.foreach(collection).retry(3).suppressFailureWhenFinished().onFailure((str, exc) -> {
            LOG.debug("Failed to remove directory {} on job cleanup", str, exc);
        }).run(str2 -> {
            LOG.info("Cleaning location: {}", str2);
            Path path = new Path(str2);
            Util.getFs(path, jobConf).delete(path, true);
        }, IOException.class);
        LOG.info("Cleaning for job {} finished", jobContext.getJobID());
    }

    private static ExecutorService fileExecutor(Configuration configuration) {
        return Executors.newFixedThreadPool(configuration.getInt(InputFormatConfig.COMMIT_FILE_THREAD_POOL_SIZE, 10), new ThreadFactoryBuilder().setDaemon(true).setPriority(5).setNameFormat("iceberg-commit-file-pool-%d").build());
    }

    private static ExecutorService tableExecutor(Configuration configuration, int i) {
        int min = Math.min(i, configuration.getInt(InputFormatConfig.COMMIT_TABLE_THREAD_POOL_SIZE, 10));
        if (min > 1) {
            return Executors.newFixedThreadPool(min, new ThreadFactoryBuilder().setDaemon(true).setPriority(5).setNameFormat("iceberg-commit-table-pool-%d").build());
        }
        return null;
    }

    private static FilesForCommit collectResults(int i, ExecutorService executorService, String str, JobContext jobContext, FileIO fileIO, boolean z) {
        JobConf jobConf = jobContext.getJobConf();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        Tasks.range(i).throwFailureWhenFinished(z).executeWith(executorService).retry(3).run(num -> {
            FilesForCommit readFileForCommit = readFileForCommit(generateFileForCommitLocation(str, jobConf, jobContext.getJobID(), num.intValue()), fileIO);
            concurrentLinkedQueue.addAll(readFileForCommit.dataFiles());
            concurrentLinkedQueue2.addAll(readFileForCommit.deleteFiles());
        });
        return new FilesForCommit(concurrentLinkedQueue, concurrentLinkedQueue2);
    }

    @VisibleForTesting
    static String generateJobLocation(String str, Configuration configuration, JobID jobID) {
        return str + "/temp/" + configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + jobID;
    }

    private static String generateFileForCommitLocation(String str, Configuration configuration, JobID jobID, int i) {
        return generateJobLocation(str, configuration, jobID) + "/task-" + i + FOR_COMMIT_EXTENSION;
    }

    private static void createFileForCommit(FilesForCommit filesForCommit, String str, FileIO fileIO) throws IOException {
        OutputFile newOutputFile = fileIO.newOutputFile(str);
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(newOutputFile.createOrOverwrite());
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(filesForCommit);
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                LOG.debug("Iceberg committed file is created {}", newOutputFile);
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }

    private static FilesForCommit readFileForCommit(String str, FileIO fileIO) {
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(fileIO.newInputFile(str).newStream());
            Throwable th = null;
            try {
                try {
                    FilesForCommit filesForCommit = (FilesForCommit) objectInputStream.readObject();
                    if (objectInputStream != null) {
                        if (0 != 0) {
                            try {
                                objectInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            objectInputStream.close();
                        }
                    }
                    return filesForCommit;
                } finally {
                }
            } catch (Throwable th3) {
                if (objectInputStream != null) {
                    if (th != null) {
                        try {
                            objectInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        objectInputStream.close();
                    }
                }
                throw th3;
            }
        } catch (IOException | ClassNotFoundException e) {
            throw new NotFoundException("Can not read or parse committed file: %s", str);
        }
    }
}
