package co.cask.cdap.data.tools;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.common.utils.ProjectInfo;
import co.cask.cdap.data2.dataset2.lib.hbase.AbstractHBaseDataSetAdmin;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Map;
import java.util.NavigableMap;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/tools/AbstractQueueUpgrader.class */
public abstract class AbstractQueueUpgrader extends AbstractUpgrader {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractQueueUpgrader.class);
    protected final HBaseTableUtil tableUtil;
    protected final Configuration conf;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractQueueUpgrader(LocationFactory locationFactory, NamespacedLocationFactory namespacedLocationFactory, HBaseTableUtil hBaseTableUtil, Configuration configuration) {
        super(locationFactory, namespacedLocationFactory);
        this.tableUtil = hBaseTableUtil;
        this.conf = configuration;
    }

    protected abstract TableId getTableId();

    @Nullable
    protected abstract byte[] processRowKey(byte[] bArr);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Finally extract failed */
    @Override // co.cask.cdap.data.tools.AbstractUpgrader
    public void upgrade() throws Exception {
        TableId tableId = getTableId();
        if (!this.tableUtil.tableExists(new HBaseAdmin(this.conf), tableId)) {
            LOG.info("Table does not exist: {}. No upgrade necessary.", tableId);
            return;
        }
        HTable createHTable = this.tableUtil.createHTable(this.conf, tableId);
        ProjectInfo.Version version = AbstractHBaseDataSetAdmin.getVersion(createHTable.getTableDescriptor());
        if (ProjectInfo.getVersion().compareTo(version) <= 0) {
            LOG.info("Table {} has already been upgraded. Its version is: {}", tableId, version);
            return;
        }
        LOG.info("Starting upgrade for table {}", Bytes.toString(createHTable.getTableName()));
        try {
            try {
                Scan scan = new Scan();
                scan.setTimeRange(0L, Long.MAX_VALUE);
                scan.addFamily(QueueEntryRow.COLUMN_FAMILY);
                scan.setMaxVersions(1);
                ArrayList newArrayList = Lists.newArrayList();
                ResultScanner scanner = createHTable.getScanner(scan);
                while (true) {
                    try {
                        Result next = scanner.next();
                        if (next == null) {
                            scanner.close();
                            createHTable.batch(newArrayList);
                            LOG.info("Successfully completed upgrade for table {}", Bytes.toString(createHTable.getTableName()));
                            createHTable.close();
                            return;
                        }
                        byte[] row = next.getRow();
                        String bytes = Bytes.toString(row);
                        byte[] processRowKey = processRowKey(row);
                        NavigableMap familyMap = next.getFamilyMap(QueueEntryRow.COLUMN_FAMILY);
                        if (processRowKey != null) {
                            Put put = new Put(processRowKey);
                            for (Map.Entry entry : familyMap.entrySet()) {
                                LOG.debug("Adding entry {} -> {} for upgrade", Bytes.toString((byte[]) entry.getKey()), Bytes.toString((byte[]) entry.getValue()));
                                put.add(QueueEntryRow.COLUMN_FAMILY, (byte[]) entry.getKey(), (byte[]) entry.getValue());
                                newArrayList.add(put);
                            }
                            LOG.debug("Marking old key {} for deletion", bytes);
                            newArrayList.add(new Delete(row));
                        }
                        LOG.info("Finished processing row key {}", bytes);
                    } catch (Throwable th) {
                        scanner.close();
                        throw th;
                    }
                }
            } catch (Exception e) {
                LOG.error("Error while upgrading table: {}", tableId, e);
                throw Throwables.propagate(e);
            }
        } catch (Throwable th2) {
            createHTable.close();
            throw th2;
        }
    }
}
