package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.class */
public class HiveIcebergOutputCommitter extends OutputCommitter {
    private static final String FOR_COMMIT_EXTENSION = ".forCommit";
    private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);

    public void setupJob(JobContext jobContext) {
    }

    public void setupTask(TaskAttemptContext taskAttemptContext) {
    }

    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
        return TaskType.REDUCE.equals(taskAttemptContext.getTaskAttemptID().getTaskID().getTaskType()) || taskAttemptContext.getJobConf().getNumReduceTasks() == 0;
    }

    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        TaskAttemptContext enrichContextWithAttemptWrapper = TezUtil.enrichContextWithAttemptWrapper(taskAttemptContext);
        TaskAttemptID taskAttemptID = enrichContextWithAttemptWrapper.getTaskAttemptID();
        JobConf jobConf = enrichContextWithAttemptWrapper.getJobConf();
        Collection<String> outputTables = HiveIcebergStorageHandler.outputTables(enrichContextWithAttemptWrapper.getJobConf());
        Map map = (Map) Optional.ofNullable(HiveIcebergRecordWriter.getWriters(taskAttemptID)).orElseGet(() -> {
            LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputTables, taskAttemptID);
            return ImmutableMap.of();
        });
        ExecutorService tableExecutor = tableExecutor(jobConf, outputTables.size());
        try {
            Tasks.foreach(outputTables).retry(3).stopOnFailure().throwFailureWhenFinished().executeWith(tableExecutor).run(str -> {
                DataFile[] dataFileArr;
                Table table = HiveIcebergStorageHandler.table(enrichContextWithAttemptWrapper.getJobConf(), str);
                if (table == null) {
                    LOG.info("CommitTask found no serialized table in config for table: {}.", str);
                    return;
                }
                HiveIcebergRecordWriter hiveIcebergRecordWriter = (HiveIcebergRecordWriter) map.get(str);
                if (hiveIcebergRecordWriter != null) {
                    dataFileArr = hiveIcebergRecordWriter.dataFiles();
                } else {
                    LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", str, taskAttemptID);
                    dataFileArr = new DataFile[0];
                }
                createFileForCommit(dataFileArr, generateFileForCommitLocation(table.location(), jobConf, taskAttemptID.getJobID(), taskAttemptID.getTaskID().getId()), table.io());
            }, IOException.class);
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            HiveIcebergRecordWriter.removeWriters(taskAttemptID);
        } catch (Throwable th) {
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
        Map<String, HiveIcebergRecordWriter> removeWriters = HiveIcebergRecordWriter.removeWriters(TezUtil.enrichContextWithAttemptWrapper(taskAttemptContext).getTaskAttemptID());
        if (removeWriters != null) {
            Iterator<HiveIcebergRecordWriter> it = removeWriters.values().iterator();
            while (it.hasNext()) {
                it.next().close(true);
            }
        }
    }

    public void commitJob(JobContext jobContext) throws IOException {
        JobContext enrichContextWithVertexId = TezUtil.enrichContextWithVertexId(jobContext);
        JobConf jobConf = enrichContextWithVertexId.getJobConf();
        long currentTimeMillis = System.currentTimeMillis();
        LOG.info("Committing job {} has started", enrichContextWithVertexId.getJobID());
        Collection<String> outputTables = HiveIcebergStorageHandler.outputTables(enrichContextWithVertexId.getJobConf());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ExecutorService fileExecutor = fileExecutor(jobConf);
        ExecutorService tableExecutor = tableExecutor(jobConf, outputTables.size());
        try {
            Tasks.foreach(outputTables).throwFailureWhenFinished().stopOnFailure().executeWith(tableExecutor).run(str -> {
                Table table = HiveIcebergStorageHandler.table(jobConf, str);
                if (table == null) {
                    LOG.info("CommitJob found no serialized table in config for table: {}. Skipping job commit.", str);
                    return;
                }
                String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, str);
                concurrentLinkedQueue.add(generateJobLocation(table.location(), jobConf, enrichContextWithVertexId.getJobID()));
                commitTable(table.io(), fileExecutor, enrichContextWithVertexId, str, table.location(), catalogName);
            });
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            LOG.info("Commit took {} ms for job {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), enrichContextWithVertexId.getJobID());
            cleanup(enrichContextWithVertexId, concurrentLinkedQueue);
        } catch (Throwable th) {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    public void abortJob(JobContext jobContext, int i) throws IOException {
        JobContext enrichContextWithVertexId = TezUtil.enrichContextWithVertexId(jobContext);
        JobConf jobConf = enrichContextWithVertexId.getJobConf();
        LOG.info("Job {} is aborted. Data file cleaning started", enrichContextWithVertexId.getJobID());
        Collection<String> outputTables = HiveIcebergStorageHandler.outputTables(enrichContextWithVertexId.getJobConf());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ExecutorService fileExecutor = fileExecutor(jobConf);
        ExecutorService tableExecutor = tableExecutor(jobConf, outputTables.size());
        try {
            Tasks.foreach(outputTables).suppressFailureWhenFinished().executeWith(tableExecutor).onFailure((str, exc) -> {
                LOG.warn("Failed cleanup table {} on abort job", str, exc);
            }).run(str2 -> {
                LOG.info("Cleaning table {} with job id {}", str2, enrichContextWithVertexId.getJobID());
                Table table = HiveIcebergStorageHandler.table(jobConf, str2);
                concurrentLinkedQueue.add(generateJobLocation(table.location(), jobConf, enrichContextWithVertexId.getJobID()));
                Collection<DataFile> dataFiles = dataFiles(fileExecutor, table.location(), enrichContextWithVertexId, table.io(), false);
                if (dataFiles.size() > 0) {
                    Tasks.foreach(dataFiles).retry(3).suppressFailureWhenFinished().executeWith(fileExecutor).onFailure((dataFile, exc2) -> {
                        LOG.warn("Failed to remove data file {} on abort job", dataFile.path(), exc2);
                    }).run(dataFile2 -> {
                        table.io().deleteFile(dataFile2.path().toString());
                    });
                }
            });
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            LOG.info("Job {} is aborted. Data file cleaning finished", enrichContextWithVertexId.getJobID());
            cleanup(enrichContextWithVertexId, concurrentLinkedQueue);
        } catch (Throwable th) {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    private void commitTable(FileIO fileIO, ExecutorService executorService, JobContext jobContext, String str, String str2, String str3) {
        JobConf jobConf = jobContext.getJobConf();
        Properties properties = new Properties();
        properties.put(Catalogs.NAME, str);
        properties.put("location", str2);
        if (str3 != null) {
            properties.put(InputFormatConfig.CATALOG_NAME, str3);
        }
        Table loadTable = Catalogs.loadTable(jobConf, properties);
        long currentTimeMillis = System.currentTimeMillis();
        LOG.info("Committing job has started for table: {}, using location: {}", loadTable, generateJobLocation(str2, jobConf, jobContext.getJobID()));
        Collection<DataFile> dataFiles = dataFiles(executorService, str2, jobContext, fileIO, true);
        if (dataFiles.size() <= 0) {
            LOG.info("Commit took {} ms for table: {} with no new files", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), loadTable);
            return;
        }
        AppendFiles newAppend = loadTable.newAppend();
        Objects.requireNonNull(newAppend);
        dataFiles.forEach(newAppend::appendFile);
        newAppend.commit();
        LOG.info("Commit took {} ms for table: {} with {} file(s)", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), loadTable, Integer.valueOf(dataFiles.size())});
        LOG.debug("Added files {}", dataFiles);
    }

    private void cleanup(JobContext jobContext, Collection<String> collection) throws IOException {
        JobConf jobConf = jobContext.getJobConf();
        LOG.info("Cleaning for job {} started", jobContext.getJobID());
        Tasks.foreach(collection).retry(3).suppressFailureWhenFinished().onFailure((str, exc) -> {
            LOG.debug("Failed to remove directory {} on job cleanup", str, exc);
        }).run(str2 -> {
            LOG.info("Cleaning location: {}", str2);
            Path path = new Path(str2);
            Util.getFs(path, jobConf).delete(path, true);
        }, IOException.class);
        LOG.info("Cleaning for job {} finished", jobContext.getJobID());
    }

    private static ExecutorService fileExecutor(Configuration configuration) {
        return Executors.newFixedThreadPool(configuration.getInt(InputFormatConfig.COMMIT_FILE_THREAD_POOL_SIZE, 10), new ThreadFactoryBuilder().setDaemon(true).setPriority(5).setNameFormat("iceberg-commit-file-pool-%d").build());
    }

    private static ExecutorService tableExecutor(Configuration configuration, int i) {
        int min = Math.min(i, configuration.getInt(InputFormatConfig.COMMIT_TABLE_THREAD_POOL_SIZE, 10));
        if (min > 1) {
            return Executors.newFixedThreadPool(min, new ThreadFactoryBuilder().setDaemon(true).setPriority(5).setNameFormat("iceberg-commit-table-pool-%d").build());
        }
        return null;
    }

    private static Collection<DataFile> dataFiles(ExecutorService executorService, String str, JobContext jobContext, FileIO fileIO, boolean z) {
        JobConf jobConf = jobContext.getJobConf();
        int numReduceTasks = jobConf.getNumReduceTasks() > 0 ? jobConf.getNumReduceTasks() : jobConf.getNumMapTasks();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Tasks.range(numReduceTasks).throwFailureWhenFinished(z).executeWith(executorService).retry(3).run(num -> {
            concurrentLinkedQueue.addAll(Arrays.asList(readFileForCommit(generateFileForCommitLocation(str, jobConf, jobContext.getJobID(), num.intValue()), fileIO)));
        });
        return concurrentLinkedQueue;
    }

    @VisibleForTesting
    static String generateJobLocation(String str, Configuration configuration, JobID jobID) {
        return str + "/temp/" + configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + jobID;
    }

    private static String generateFileForCommitLocation(String str, Configuration configuration, JobID jobID, int i) {
        return generateJobLocation(str, configuration, jobID) + "/task-" + i + FOR_COMMIT_EXTENSION;
    }

    private static void createFileForCommit(DataFile[] dataFileArr, String str, FileIO fileIO) throws IOException {
        OutputFile newOutputFile = fileIO.newOutputFile(str);
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(newOutputFile.createOrOverwrite());
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(dataFileArr);
                $closeResource(null, objectOutputStream);
                LOG.debug("Iceberg committed file is created {}", newOutputFile);
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, objectOutputStream);
            throw th2;
        }
    }

    private static DataFile[] readFileForCommit(String str, FileIO fileIO) {
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(fileIO.newInputFile(str).newStream());
            Throwable th = null;
            try {
                try {
                    DataFile[] dataFileArr = (DataFile[]) objectInputStream.readObject();
                    $closeResource(null, objectInputStream);
                    return dataFileArr;
                } finally {
                }
            } catch (Throwable th2) {
                $closeResource(th, objectInputStream);
                throw th2;
            }
        } catch (IOException | ClassNotFoundException e) {
            throw new NotFoundException("Can not read or parse committed file: %s", str);
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
