package co.cask.cdap.logging.save;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.data2.transaction.Transactions;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/CheckpointManager.class */
public final class CheckpointManager {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointManager.class);
    private static final byte[] OFFSET_COLNAME = Bytes.toBytes("nextOffset");
    private static final byte[] MAX_TIME_COLNAME = Bytes.toBytes("maxEventTime");
    private final byte[] rowKeyPrefix;
    private final LogSaverTableUtil tableUtil;
    private final TransactionExecutorFactory transactionExecutorFactory;
    private Map<Integer, Checkpoint> lastCheckpoint = new HashMap();

    public CheckpointManager(LogSaverTableUtil logSaverTableUtil, TransactionExecutorFactory transactionExecutorFactory, String str, int i) {
        this.rowKeyPrefix = Bytes.add(Bytes.toBytes(i), Bytes.toBytes(str));
        this.tableUtil = logSaverTableUtil;
        this.transactionExecutorFactory = transactionExecutorFactory;
    }

    private <T> T execute(TransactionExecutor.Function<Table, T> function) {
        try {
            TransactionAware metaTable = this.tableUtil.getMetaTable();
            if (metaTable instanceof TransactionAware) {
                return (T) Transactions.createTransactionExecutor(this.transactionExecutorFactory, metaTable).execute(function, metaTable);
            }
            throw new RuntimeException(String.format("Table %s is not TransactionAware, Exception while trying to cast it to TransactionAware. Please check why the table is not TransactionAware", metaTable));
        } catch (Exception e) {
            throw new RuntimeException(String.format("Error accessing %s table", this.tableUtil.getMetaTableName()), e);
        }
    }

    private void execute(TransactionExecutor.Procedure<Table> procedure) {
        try {
            TransactionAware metaTable = this.tableUtil.getMetaTable();
            if (!(metaTable instanceof TransactionAware)) {
                throw new RuntimeException(String.format("Table %s is not TransactionAware, Exception while trying to cast it to TransactionAware. Please check why the table is not TransactionAware", metaTable));
            }
            Transactions.createTransactionExecutor(this.transactionExecutorFactory, metaTable).execute(procedure, metaTable);
        } catch (Exception e) {
            throw new RuntimeException(String.format("Error accessing %s table", this.tableUtil.getMetaTableName()), e);
        }
    }

    public void saveCheckpoint(final Map<Integer, Checkpoint> map) throws Exception {
        if (this.lastCheckpoint.equals(map)) {
            return;
        }
        execute(new TransactionExecutor.Procedure<Table>() { // from class: co.cask.cdap.logging.save.CheckpointManager.1
            public void apply(Table table) throws Exception {
                for (Map.Entry entry : map.entrySet()) {
                    byte[] add = Bytes.add(CheckpointManager.this.rowKeyPrefix, Bytes.toBytes(((Integer) entry.getKey()).intValue()));
                    Checkpoint checkpoint = (Checkpoint) entry.getValue();
                    table.put(add, CheckpointManager.OFFSET_COLNAME, Bytes.toBytes(checkpoint.getNextOffset()));
                    table.put(add, CheckpointManager.MAX_TIME_COLNAME, Bytes.toBytes(checkpoint.getMaxEventTime()));
                }
                CheckpointManager.this.lastCheckpoint = ImmutableMap.copyOf(map);
            }
        });
        LOG.trace("Saving checkpoints for partitions {}", map);
    }

    public Map<Integer, Checkpoint> getCheckpoint(final Set<Integer> set) throws Exception {
        return (Map) execute(new TransactionExecutor.Function<Table, Map<Integer, Checkpoint>>() { // from class: co.cask.cdap.logging.save.CheckpointManager.2
            public Map<Integer, Checkpoint> apply(Table table) throws Exception {
                HashMap hashMap = new HashMap();
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    int intValue = ((Integer) it.next()).intValue();
                    Row row = table.get(Bytes.add(CheckpointManager.this.rowKeyPrefix, Bytes.toBytes(intValue)));
                    hashMap.put(Integer.valueOf(intValue), new Checkpoint(row.getLong(CheckpointManager.OFFSET_COLNAME, -1L), row.getLong(CheckpointManager.MAX_TIME_COLNAME, -1L)));
                }
                return hashMap;
            }
        });
    }

    public Checkpoint getCheckpoint(final int i) throws Exception {
        Checkpoint checkpoint = (Checkpoint) execute(new TransactionExecutor.Function<Table, Checkpoint>() { // from class: co.cask.cdap.logging.save.CheckpointManager.3
            public Checkpoint apply(Table table) throws Exception {
                Row row = table.get(Bytes.add(CheckpointManager.this.rowKeyPrefix, Bytes.toBytes(i)));
                return new Checkpoint(row.getLong(CheckpointManager.OFFSET_COLNAME, -1L), row.getLong(CheckpointManager.MAX_TIME_COLNAME, -1L));
            }
        });
        LOG.trace("Read checkpoint {} for partition {}", checkpoint, Integer.valueOf(i));
        return checkpoint;
    }
}
