package org.apache.gobblin.compaction.hive;

import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.gobblin.compaction.Compactor;
import org.apache.gobblin.compaction.hive.AvroExternalTable;
import org.apache.gobblin.compaction.hive.HiveManagedTable;
import org.apache.gobblin.util.HiveJdbcConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/compaction/hive/SerialCompactor.class */
public class SerialCompactor implements Compactor {
    private static final Logger LOG = LoggerFactory.getLogger(SerialCompactor.class);
    private static final String HIVE_DB_NAME = "hive.db.name";
    private static final String HIVE_QUEUE_NAME = "hive.queue.name";
    private static final String HIVE_USE_MAPJOIN = "hive.use.mapjoin";
    private static final String HIVE_MAPJOIN_SMALLTABLE_FILESIZE = "hive.mapjoin.smalltable.filesize";
    private static final String HIVE_AUTO_CONVERT_JOIN = "hive.auto.convert.join";
    private static final String HIVE_INPUT_SPLIT_SIZE = "hive.input.split.size";
    private static final String MAPRED_MIN_SPLIT_SIZE = "mapred.min.split.size";
    private static final String MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces";
    private static final String MAPREDUCE_JOB_NUM_REDUCERS = "mapreduce.job.num.reducers";
    private static final String MAPREDUCE_JOB_QUEUENAME = "mapreduce.job.queuename";
    private final AvroExternalTable snapshot;
    private final List<AvroExternalTable> deltas;
    private final String outputTableName;
    private final String outputDataLocationInHdfs;
    private final AvroExternalTable latestTable;
    private final String jobId;
    private HiveJdbcConnector conn;

    /* loaded from: input_file:org/apache/gobblin/compaction/hive/SerialCompactor$Builder.class */
    public static class Builder {
        private AvroExternalTable snapshot;
        private List<AvroExternalTable> deltas;
        private String outputTableName;
        private String outputDataLocationInHdfs;

        public Builder withSnapshot(AvroExternalTable avroExternalTable) {
            this.snapshot = avroExternalTable;
            return this;
        }

        public Builder withDeltas(List<AvroExternalTable> list) {
            Preconditions.checkArgument(list.size() >= 1, "Number of delta tables should be at least 1");
            this.deltas = list;
            return this;
        }

        public Builder withOutputTableName(String str) {
            this.outputTableName = str;
            return this;
        }

        public Builder withOutputDataLocationInHdfs(String str) {
            this.outputDataLocationInHdfs = str;
            return this;
        }

        public SerialCompactor build() {
            return new SerialCompactor(this);
        }
    }

    private SerialCompactor(Builder builder) {
        this.snapshot = builder.snapshot;
        this.deltas = builder.deltas;
        this.outputTableName = builder.outputTableName;
        this.outputDataLocationInHdfs = builder.outputDataLocationInHdfs;
        this.latestTable = this.deltas.get(this.deltas.size() - 1);
        this.jobId = UUID.randomUUID().toString().replaceAll("-", "_");
    }

    @Override // org.apache.gobblin.compaction.Compactor
    public void compact() throws IOException {
        checkSchemaCompatibility();
        Closer create = Closer.create();
        try {
            try {
                this.conn = create.register(HiveJdbcConnector.newConnectorWithProps(CompactionRunner.properties));
                setHiveParameters();
                createTables();
                HiveTable mergeDeltas = mergeDeltas();
                unionNotUpdatedRecordsAndDeltas(getNotUpdatedRecords(this.snapshot, mergeDeltas), mergeDeltas);
                try {
                    deleteTmpFiles();
                    create.close();
                } finally {
                }
            } catch (IOException e) {
                LOG.error("IOException during compaction: " + e.getMessage());
                throw new RuntimeException(e);
            } catch (RuntimeException e2) {
                LOG.error("Runtime Exception during compaction: " + e2.getMessage());
                throw e2;
            } catch (SQLException e3) {
                LOG.error("SQLException during compaction: " + e3.getMessage());
                throw new RuntimeException(e3);
            }
        } catch (Throwable th) {
            try {
                deleteTmpFiles();
                create.close();
                throw th;
            } finally {
            }
        }
    }

    private void checkSchemaCompatibility() {
        for (int i = 0; i < this.deltas.size(); i++) {
            if (!this.snapshot.hasSamePrimaryKey(this.deltas.get(i))) {
                String str = "Schema incompatible: the snapshot table and delta table #" + (i + 1) + " do not have the same primary key.";
                LOG.error(str);
                throw new RuntimeException(str);
            }
        }
    }

    private void setHiveParameters() throws SQLException {
        setHiveQueueName();
        setHiveDbName();
        setHiveMapjoin();
        setHiveInputSplitSize();
        setNumberOfReducers();
    }

    private void setHiveQueueName() throws SQLException {
        this.conn.executeStatements(new String[]{"set mapreduce.job.queuename=" + CompactionRunner.jobProperties.getProperty(HIVE_QUEUE_NAME, "default")});
    }

    private void setHiveDbName() throws SQLException {
        this.conn.executeStatements(new String[]{"use " + CompactionRunner.jobProperties.getProperty(HIVE_DB_NAME, "default")});
    }

    private void setHiveMapjoin() throws SQLException {
        boolean parseBoolean = Boolean.parseBoolean(CompactionRunner.jobProperties.getProperty(HIVE_USE_MAPJOIN, "false"));
        boolean containsKey = CompactionRunner.jobProperties.containsKey(HIVE_MAPJOIN_SMALLTABLE_FILESIZE);
        if (parseBoolean && containsKey) {
            this.conn.executeStatements(new String[]{"set hive.auto.convert.join=true"});
            this.conn.executeStatements(new String[]{"set hive.mapjoin.smalltable.filesize=" + CompactionRunner.jobProperties.getProperty(HIVE_MAPJOIN_SMALLTABLE_FILESIZE)});
        }
    }

    private void setHiveInputSplitSize() throws SQLException {
        if (CompactionRunner.jobProperties.containsKey(HIVE_INPUT_SPLIT_SIZE)) {
            this.conn.executeStatements(new String[]{"set mapred.min.split.size=" + CompactionRunner.jobProperties.getProperty(HIVE_INPUT_SPLIT_SIZE)});
        }
    }

    private void setNumberOfReducers() throws SQLException {
        if (CompactionRunner.jobProperties.containsKey(MAPREDUCE_JOB_NUM_REDUCERS)) {
            this.conn.executeStatements(new String[]{"set mapreduce.job.reduces=" + CompactionRunner.jobProperties.getProperty(MAPREDUCE_JOB_NUM_REDUCERS)});
        }
    }

    private void createTables() throws SQLException {
        this.snapshot.createTable(this.conn, this.jobId);
        Iterator<AvroExternalTable> it = this.deltas.iterator();
        while (it.hasNext()) {
            it.next().createTable(this.conn, this.jobId);
        }
    }

    private HiveTable mergeDeltas() throws SQLException {
        if (this.deltas.size() == 1) {
            LOG.info("Only one delta table: no need to merge delta");
            return this.deltas.get(0);
        }
        HiveManagedTable build = new HiveManagedTable.Builder().withName("merged_delta").withAttributes(this.deltas.get(0).getAttributes()).withPrimaryKeys(this.deltas.get(0).getPrimaryKeys()).build();
        build.createTable(this.conn, this.jobId);
        insertFirstDeltaIntoMergedDelta(build);
        this.deltas.get(0).dropTable(this.conn, this.jobId);
        for (int i = 1; i < this.deltas.size(); i++) {
            build = mergeTwoDeltas(build, this.deltas.get(i));
            LOG.info("Merged the first " + (i + 1) + " delta tables");
            this.deltas.get(i).dropTable(this.conn, this.jobId);
        }
        return build;
    }

    private void insertFirstDeltaIntoMergedDelta(HiveManagedTable hiveManagedTable) throws SQLException {
        this.conn.executeStatements(new String[]{"INSERT OVERWRITE TABLE " + hiveManagedTable.getNameWithJobId(this.jobId) + " SELECT * FROM " + this.deltas.get(0).getNameWithJobId(this.jobId)});
    }

    private HiveManagedTable mergeTwoDeltas(HiveManagedTable hiveManagedTable, AvroExternalTable avroExternalTable) throws SQLException {
        HiveTable addNewColumnsInSchema = getNotUpdatedRecords(hiveManagedTable, avroExternalTable).addNewColumnsInSchema(this.conn, this.latestTable, this.jobId);
        HiveTable addNewColumnsInSchema2 = avroExternalTable.addNewColumnsInSchema(this.conn, this.latestTable, this.jobId);
        HiveManagedTable build = new HiveManagedTable.Builder().withName(hiveManagedTable.getName()).withAttributes(this.latestTable.getAttributes()).withPrimaryKeys(this.latestTable.getPrimaryKeys()).build();
        build.createTable(this.conn, this.jobId);
        this.conn.executeStatements(new String[]{"INSERT OVERWRITE TABLE " + build.getNameWithJobId(this.jobId) + " SELECT " + getAttributesInNewSchema() + " FROM " + addNewColumnsInSchema.getNameWithJobId(this.jobId) + " UNION ALL SELECT " + getAttributesInNewSchema() + " FROM " + addNewColumnsInSchema2.getNameWithJobId(this.jobId)});
        avroExternalTable.dropTable(this.conn, this.jobId);
        return build;
    }

    private String getAttributesInNewSchema() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < this.latestTable.getAttributes().size(); i++) {
            sb.append(this.latestTable.getAttributes().get(i).name());
            if (i < this.latestTable.getAttributes().size() - 1) {
                sb.append(", ");
            }
        }
        return sb.toString();
    }

    private HiveManagedTable getNotUpdatedRecords(HiveTable hiveTable, HiveTable hiveTable2) throws SQLException {
        LOG.info("Getting records in table " + hiveTable.getNameWithJobId(this.jobId) + " but not in table " + hiveTable2.getNameWithJobId(this.jobId));
        HiveManagedTable build = new HiveManagedTable.Builder().withName("not_updated").withPrimaryKeys(hiveTable.getPrimaryKeys()).withAttributes(hiveTable.getAttributes()).build();
        build.createTable(this.conn, this.jobId);
        this.conn.executeStatements(new String[]{"INSERT OVERWRITE TABLE " + build.getNameWithJobId(this.jobId) + " SELECT " + hiveTable.getNameWithJobId(this.jobId) + ".* FROM " + hiveTable.getNameWithJobId(this.jobId) + " LEFT OUTER JOIN " + hiveTable2.getNameWithJobId(this.jobId) + " ON " + getJoinCondition(hiveTable, hiveTable2) + " WHERE " + getKeyIsNullPredicate(hiveTable2)});
        hiveTable.dropTable(this.conn, this.jobId);
        return build;
    }

    private String getJoinCondition(HiveTable hiveTable, HiveTable hiveTable2) {
        if (!hiveTable.getPrimaryKeys().equals(hiveTable2.getPrimaryKeys())) {
            throw new RuntimeException("The primary keys of table " + hiveTable.getName() + " and table " + hiveTable2.getName() + " are different");
        }
        boolean z = false;
        StringBuilder sb = new StringBuilder();
        for (String str : hiveTable.getPrimaryKeys()) {
            if (z) {
                sb.append(" AND ");
            }
            sb.append(hiveTable.getNameWithJobId(this.jobId) + "." + str + " = " + hiveTable2.getNameWithJobId(this.jobId) + "." + str);
            z = true;
        }
        return sb.toString();
    }

    private String getKeyIsNullPredicate(HiveTable hiveTable) {
        boolean z = false;
        StringBuilder sb = new StringBuilder();
        for (String str : hiveTable.getPrimaryKeys()) {
            if (z) {
                sb.append(" AND ");
            }
            sb.append(hiveTable.getNameWithJobId(this.jobId) + "." + str + " IS NULL");
            z = true;
        }
        return sb.toString();
    }

    private AvroExternalTable unionNotUpdatedRecordsAndDeltas(HiveManagedTable hiveManagedTable, HiveTable hiveTable) throws IOException, SQLException {
        LOG.info("Taking union of table " + hiveManagedTable.getNameWithJobId(this.jobId) + "(records in snapshot but not in delta) and table " + hiveTable.getNameWithJobId(this.jobId) + "(merged delta)");
        HiveTable addNewColumnsInSchema = hiveManagedTable.addNewColumnsInSchema(this.conn, this.latestTable, this.jobId);
        HiveTable addNewColumnsInSchema2 = hiveTable.addNewColumnsInSchema(this.conn, this.latestTable, this.jobId);
        AvroExternalTable build = new AvroExternalTable.Builder().withName(this.outputTableName).withPrimaryKeys(this.latestTable.getPrimaryKeys()).withSchemaLocation(this.latestTable.getSchemaLocationInHdfs()).withDataLocation(this.outputDataLocationInHdfs).build();
        build.createTable(this.conn, this.jobId);
        this.conn.executeStatements(new String[]{"INSERT OVERWRITE TABLE " + build.getNameWithJobId(this.jobId) + " SELECT " + getAttributesInNewSchema() + " FROM " + addNewColumnsInSchema.getNameWithJobId(this.jobId) + " UNION ALL SELECT " + getAttributesInNewSchema() + " FROM " + addNewColumnsInSchema2.getNameWithJobId(this.jobId)});
        addNewColumnsInSchema.dropTable(this.conn, this.jobId);
        addNewColumnsInSchema2.dropTable(this.conn, this.jobId);
        return build;
    }

    private void deleteTmpFiles() throws IllegalArgumentException, IOException {
        this.snapshot.deleteTmpFilesIfNeeded();
        Iterator<AvroExternalTable> it = this.deltas.iterator();
        while (it.hasNext()) {
            it.next().deleteTmpFilesIfNeeded();
        }
    }

    @Override // org.apache.gobblin.compaction.Compactor
    public void cancel() throws IOException {
    }
}
