package co.cask.cdap.internal.app.runtime.schedule.queue;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.dataset.lib.AbstractCloseableIterator;
import co.cask.cdap.api.dataset.lib.AbstractDataset;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.module.EmbeddedDataset;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.schedule.Trigger;
import co.cask.cdap.internal.app.runtime.messaging.TopicMessageIdStore;
import co.cask.cdap.internal.app.runtime.schedule.ProgramSchedule;
import co.cask.cdap.internal.app.runtime.schedule.constraint.ConstraintCodec;
import co.cask.cdap.internal.app.runtime.schedule.queue.Job;
import co.cask.cdap.internal.app.runtime.schedule.trigger.SatisfiableTrigger;
import co.cask.cdap.internal.app.runtime.schedule.trigger.TriggerCodec;
import co.cask.cdap.internal.schedule.constraint.Constraint;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.id.ScheduleId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.tephra.Transaction;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/queue/JobQueueDataset.class */
public class JobQueueDataset extends AbstractDataset implements JobQueue, TopicMessageIdStore {
    static final String EMBEDDED_TABLE_NAME = "t";
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Trigger.class, new TriggerCodec()).registerTypeAdapter(Constraint.class, new ConstraintCodec()).create();
    private static final byte[] COL = {67};
    private static final byte[] TO_DELETE_COL = {68};
    private static final byte[] IS_OBSOLETE_COL = {79};
    private static final byte[] JOB_ROW_PREFIX = {74};
    private static final byte[] ROW_KEY_SEPARATOR = {58};
    private static final byte[] MESSAGE_ID_ROW_PREFIX = {77};
    private static final int NUM_PARTITIONS = 16;
    private final Table table;
    private final Collection<byte[]> scheduleIds;

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobQueueDataset(String str, @EmbeddedDataset("t") Table table) {
        super(str, table, new Dataset[0]);
        this.table = table;
        this.scheduleIds = new ArrayList();
    }

    public void startTx(Transaction transaction) {
        super.startTx(transaction);
        this.scheduleIds.clear();
    }

    public Collection<byte[]> getTxChanges() {
        Collection<byte[]> txChanges = super.getTxChanges();
        if (this.scheduleIds.isEmpty()) {
            return txChanges;
        }
        ArrayList arrayList = new ArrayList(txChanges.size() + this.scheduleIds.size());
        arrayList.addAll(txChanges);
        arrayList.addAll(this.scheduleIds);
        return arrayList;
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.queue.JobQueue
    public CloseableIterator<Job> getJobsForSchedule(ScheduleId scheduleId) {
        byte[] rowKeyPrefix = getRowKeyPrefix(scheduleId);
        return createCloseableIterator(this.table.scan(rowKeyPrefix, Bytes.stopKeyForPrefix(rowKeyPrefix)));
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.queue.JobQueue
    public Job getJob(JobKey jobKey) {
        Row row = this.table.get(getRowKey(jobKey.getScheduleId(), jobKey.getCreationTime()));
        if (row.isEmpty()) {
            return null;
        }
        return fromRow(row);
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.queue.JobQueue
    public void put(Job job) {
        this.table.put(toPut(job));
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.queue.JobQueue
    public Job transitState(Job job, Job.State state) {
        job.getState().checkTransition(state);
        SimpleJob simpleJob = new SimpleJob(job.getSchedule(), job.getCreationTime(), job.getNotifications(), state, job.getScheduleLastUpdatedTime());
        put(simpleJob);
        return simpleJob;
    }

    /* JADX WARN: Code restructure failed: missing block: B:27:0x00fb, code lost:
    
        r14 = true;
        addNotification(r0, r13);
     */
    @Override // co.cask.cdap.internal.app.runtime.schedule.queue.JobQueue
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void addNotification(co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleRecord r12, co.cask.cdap.proto.Notification r13) {
        /*
            Method dump skipped, instructions count: 421
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDataset.addNotification(co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleRecord, co.cask.cdap.proto.Notification):void");
    }

    private void addNotification(Job job, Notification notification) {
        ArrayList arrayList = new ArrayList(job.getNotifications());
        arrayList.add(notification);
        Job.State state = job.getState();
        if (isTriggerSatisfied(job.getSchedule(), arrayList)) {
            state = Job.State.PENDING_CONSTRAINT;
            job.getState().checkTransition(state);
        }
        put(new SimpleJob(job.getSchedule(), job.getCreationTime(), arrayList, state, job.getScheduleLastUpdatedTime()));
    }

    private boolean isTriggerSatisfied(ProgramSchedule programSchedule, List<Notification> list) {
        return ((SatisfiableTrigger) programSchedule.getTrigger()).isSatisfied(programSchedule, list);
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.queue.JobQueue
    public void markJobsForDeletion(ScheduleId scheduleId, long j) {
        byte[] rowKeyPrefix = getRowKeyPrefix(scheduleId);
        Scanner scan = this.table.scan(rowKeyPrefix, Bytes.stopKeyForPrefix(rowKeyPrefix));
        Throwable th = null;
        while (true) {
            try {
                try {
                    Row next = scan.next();
                    if (next == null) {
                        break;
                    } else if (fromRow(next).getState() != Job.State.PENDING_LAUNCH && next.get(TO_DELETE_COL) == null) {
                        this.table.put(next.getRow(), TO_DELETE_COL, Bytes.toBytes(j));
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th3;
            }
        }
        if (scan != null) {
            if (0 == 0) {
                scan.close();
                return;
            }
            try {
                scan.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.queue.JobQueue
    public void deleteJob(Job job) {
        this.table.delete(getRowKey(job.getSchedule().getScheduleId(), job.getCreationTime()));
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.queue.JobQueue
    public int getNumPartitions() {
        return NUM_PARTITIONS;
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.queue.JobQueue
    public CloseableIterator<Job> getJobs(int i, @Nullable Job job) {
        byte[] stopKeyForPrefix;
        byte[] jobRowPrefix = getJobRowPrefix(i);
        if (job == null) {
            stopKeyForPrefix = jobRowPrefix;
        } else {
            Preconditions.checkArgument(i == getPartition(job.getSchedule().getScheduleId()), "Job is not from partition '%s': %s", new Object[]{Integer.valueOf(i), job});
            stopKeyForPrefix = Bytes.stopKeyForPrefix(getRowKey(job.getSchedule().getScheduleId(), job.getCreationTime()));
        }
        return createCloseableIterator(this.table.scan(stopKeyForPrefix, Bytes.stopKeyForPrefix(jobRowPrefix)));
    }

    public CloseableIterator<Job> fullScan() {
        return createCloseableIterator(this.table.scan(JOB_ROW_PREFIX, Bytes.stopKeyForPrefix(JOB_ROW_PREFIX)));
    }

    private CloseableIterator<Job> createCloseableIterator(final Scanner scanner) {
        return new AbstractCloseableIterator<Job>() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDataset.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public Job m223computeNext() {
                Row next = scanner.next();
                return next == null ? (Job) endOfData() : JobQueueDataset.this.fromRow(next);
            }

            public void close() {
                scanner.close();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Job fromRow(Row row) {
        SimpleJob simpleJob = (SimpleJob) GSON.fromJson(Bytes.toString(row.get(COL)), SimpleJob.class);
        Long l = row.getLong(TO_DELETE_COL);
        Long l2 = row.getLong(IS_OBSOLETE_COL);
        Long l3 = l == null ? l2 : l2 == null ? l : new Long(Math.min(l2.longValue(), l.longValue()));
        if (l3 != null) {
            simpleJob.setToBeDeleted(l3.longValue());
        }
        return simpleJob;
    }

    private Put toPut(Job job) {
        return new Put(getRowKey(job.getSchedule().getScheduleId(), job.getCreationTime()), COL, GSON.toJson(job));
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [byte[], byte[][]] */
    private byte[] getJobRowPrefix(int i) {
        return Bytes.concat((byte[][]) new byte[]{JOB_ROW_PREFIX, ROW_KEY_SEPARATOR, new byte[]{(byte) i}, ROW_KEY_SEPARATOR});
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [byte[], byte[][]] */
    private byte[] getRowKeyPrefix(ScheduleId scheduleId) {
        return Bytes.concat((byte[][]) new byte[]{getJobRowPrefix(getPartition(scheduleId)), Bytes.toBytes(Joiner.on(".").join(scheduleId.toIdParts())), ROW_KEY_SEPARATOR});
    }

    @VisibleForTesting
    int getPartition(ScheduleId scheduleId) {
        return Math.abs(Hashing.murmur3_32().newHasher().putString(scheduleId.getNamespace()).putString(scheduleId.getApplication()).putString(scheduleId.getVersion()).putString(scheduleId.getSchedule()).hash().asInt()) % NUM_PARTITIONS;
    }

    private byte[] getRowKey(ScheduleId scheduleId, long j) {
        return Bytes.add(getRowKeyPrefix(scheduleId), Bytes.toBytes(j));
    }

    @Override // co.cask.cdap.internal.app.runtime.messaging.TopicMessageIdStore
    public String retrieveSubscriberState(String str) {
        byte[] bArr = this.table.get(getRowKey(str)).get(COL);
        if (bArr == null) {
            return null;
        }
        return Bytes.toString(bArr);
    }

    @Override // co.cask.cdap.internal.app.runtime.messaging.TopicMessageIdStore
    public void persistSubscriberState(String str, String str2) {
        this.table.put(getRowKey(str), COL, Bytes.toBytes(str2));
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [byte[], byte[][]] */
    private byte[] getRowKey(String str) {
        return Bytes.concat((byte[][]) new byte[]{MESSAGE_ID_ROW_PREFIX, ROW_KEY_SEPARATOR, Bytes.toBytes(str)});
    }
}
