package co.cask.cdap.internal.app.store;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.artifact.ArtifactId;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.stream.StreamSpecification;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.ProjectInfo;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.lib.table.MDSKey;
import co.cask.cdap.data2.dataset2.lib.table.MetadataStoreDataset;
import co.cask.cdap.internal.app.ApplicationSpecificationAdapter;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.SystemArguments;
import co.cask.cdap.internal.app.runtime.workflow.BasicWorkflowToken;
import co.cask.cdap.internal.app.store.RunRecordMeta;
import co.cask.cdap.proto.BasicThrowable;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramRunCluster;
import co.cask.cdap.proto.ProgramRunClusterStatus;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.WorkflowNodeStateDetail;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProfileId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.BufferUnderflowException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.twill.api.RunId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/store/AppMetadataStore.class */
public class AppMetadataStore extends MetadataStoreDataset {
    private static final String TYPE_APP_META = "appMeta";
    private static final String TYPE_STREAM = "stream";
    private static final String TYPE_WORKFLOW_NODE_STATE = "wns";
    private static final String TYPE_WORKFLOW_TOKEN = "wft";
    private static final String TYPE_NAMESPACE = "namespace";
    private static final String TYPE_MESSAGE = "msg";
    private static volatile boolean upgradeCompleted;
    private static long lastUpgradeCompletedCheck;
    private final CConfiguration cConf;
    public static final DatasetId APP_META_INSTANCE_ID = NamespaceId.SYSTEM.dataset("app.meta");
    private static final Logger LOG = LoggerFactory.getLogger(AppMetadataStore.class);
    private static final Gson GSON = ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).create();
    private static final Type MAP_STRING_STRING_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.internal.app.store.AppMetadataStore.1
    }.getType();
    private static final Type BYTE_TYPE = new TypeToken<byte[]>() { // from class: co.cask.cdap.internal.app.store.AppMetadataStore.2
    }.getType();
    private static final byte[] APP_VERSION_UPGRADE_KEY = Bytes.toBytes("version.default.store");
    private static final String TYPE_RUN_RECORD_STARTING = "runRecordStarting";
    private static final String TYPE_RUN_RECORD_STARTED = "runRecordStarted";
    private static final String TYPE_RUN_RECORD_SUSPENDED = "runRecordSuspended";
    private static final String TYPE_RUN_RECORD_COMPLETED = "runRecordCompleted";
    private static final Map<ProgramRunStatus, String> STATUS_TYPE_MAP = ImmutableMap.builder().put(ProgramRunStatus.PENDING, TYPE_RUN_RECORD_STARTING).put(ProgramRunStatus.STARTING, TYPE_RUN_RECORD_STARTING).put(ProgramRunStatus.RUNNING, TYPE_RUN_RECORD_STARTED).put(ProgramRunStatus.SUSPENDED, TYPE_RUN_RECORD_SUSPENDED).put(ProgramRunStatus.COMPLETED, TYPE_RUN_RECORD_COMPLETED).put(ProgramRunStatus.KILLED, TYPE_RUN_RECORD_COMPLETED).put(ProgramRunStatus.FAILED, TYPE_RUN_RECORD_COMPLETED).build();
    private static final long UPGRADE_COMPLETED_CHECK_INTERVAL = TimeUnit.MINUTES.toMillis(1);

    /* renamed from: co.cask.cdap.internal.app.store.AppMetadataStore$3, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/internal/app/store/AppMetadataStore$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$proto$ProgramRunStatus = new int[ProgramRunStatus.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.ALL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.PENDING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.STARTING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.RUNNING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.SUSPENDED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/store/AppMetadataStore$AppVersionPredicate.class */
    public static class AppVersionPredicate implements Predicate<MDSKey> {
        private final String version;

        AppVersionPredicate(String str) {
            this.version = str;
        }

        @Override // java.util.function.Predicate
        public boolean test(MDSKey mDSKey) {
            return AppMetadataStore.getProgramID(mDSKey).getVersion().equals(this.version);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/store/AppMetadataStore$ScanFunction.class */
    public static class ScanFunction implements Function<MetadataStoreDataset.KeyValue<RunRecordMeta>, Boolean> {
        private final Predicate<RunRecordMeta> filter;
        private final Stopwatch stopwatch;
        private final long maxScanTimeMillis;
        private final List<RunRecordMeta> values = new ArrayList();
        private int numProcessed = 0;
        private MDSKey lastKey;

        ScanFunction(Predicate<RunRecordMeta> predicate, Ticker ticker, long j) {
            this.filter = predicate;
            this.maxScanTimeMillis = j;
            this.stopwatch = new Stopwatch(ticker);
        }

        public void start() {
            this.stopwatch.start();
        }

        public List<RunRecordMeta> getValues() {
            return Collections.unmodifiableList(this.values);
        }

        public int getNumProcessed() {
            return this.numProcessed;
        }

        public MDSKey getLastKey() {
            return this.lastKey;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.util.function.Function
        public Boolean apply(MetadataStoreDataset.KeyValue<RunRecordMeta> keyValue) {
            if (this.stopwatch.elapsedMillis() > this.maxScanTimeMillis) {
                return false;
            }
            this.numProcessed++;
            this.lastKey = keyValue.getKey();
            if (this.filter.test(keyValue.getValue())) {
                this.values.add(keyValue.getValue());
            }
            return true;
        }
    }

    public static AppMetadataStore create(CConfiguration cConfiguration, DatasetContext datasetContext, DatasetFramework datasetFramework) {
        try {
            return new AppMetadataStore(DatasetsUtil.getOrCreateDataset(datasetContext, datasetFramework, APP_META_INSTANCE_ID, Table.class.getName(), DatasetProperties.EMPTY), cConfiguration);
        } catch (DatasetManagementException | IOException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public AppMetadataStore(Table table, CConfiguration cConfiguration) {
        super(table);
        this.cConf = cConfiguration;
    }

    protected <T> byte[] serialize(T t) {
        return Bytes.toBytes(GSON.toJson(t));
    }

    protected <T> T deserialize(MDSKey mDSKey, byte[] bArr, Type type) {
        if (!RunRecordMeta.class.equals(type)) {
            return (T) GSON.fromJson(Bytes.toString(bArr), type);
        }
        RunRecordMeta runRecordMeta = (RunRecordMeta) GSON.fromJson(Bytes.toString(bArr), RunRecordMeta.class);
        return (T) RunRecordMeta.builder(runRecordMeta).setProgramRunId(getProgramID(mDSKey).run(runRecordMeta.getPid())).build();
    }

    @Nullable
    public ApplicationMeta getApplication(ApplicationId applicationId) {
        ApplicationMeta applicationMeta = (ApplicationMeta) getFirst(new MDSKey.Builder().add(new String[]{TYPE_APP_META, applicationId.getNamespace(), applicationId.getApplication(), applicationId.getVersion()}).build(), ApplicationMeta.class);
        if (applicationMeta != null) {
            return applicationMeta;
        }
        if (!hasUpgraded() && applicationId.getVersion().equals("-SNAPSHOT")) {
            applicationMeta = (ApplicationMeta) get(new MDSKey.Builder().add(new String[]{TYPE_APP_META, applicationId.getNamespace(), applicationId.getApplication()}).build(), ApplicationMeta.class);
        }
        return applicationMeta;
    }

    @Nullable
    public ApplicationMeta getApplication(String str, String str2, String str3) {
        return getApplication(new ApplicationId(str, str2, str3));
    }

    public List<ApplicationMeta> getAllApplications(String str) {
        return list(new MDSKey.Builder().add(new String[]{TYPE_APP_META, str}).build(), ApplicationMeta.class);
    }

    public List<ApplicationMeta> getAllAppVersions(String str, String str2) {
        return list(new MDSKey.Builder().add(new String[]{TYPE_APP_META, str, str2}).build(), ApplicationMeta.class);
    }

    public List<ApplicationId> getAllAppVersionsAppIds(String str, String str2) {
        ArrayList arrayList = new ArrayList();
        Iterator it = listKV(new MDSKey.Builder().add(new String[]{TYPE_APP_META, str, str2}).build(), ApplicationMeta.class).keySet().iterator();
        while (it.hasNext()) {
            MDSKey.Splitter split = ((MDSKey) it.next()).split();
            split.skipBytes();
            split.skipBytes();
            split.skipBytes();
            arrayList.add(new NamespaceId(str).app(str2, split.hasRemaining() ? split.getString() : "-SNAPSHOT"));
        }
        return arrayList;
    }

    public void writeApplication(String str, String str2, String str3, ApplicationSpecification applicationSpecification) {
        if (!hasUpgraded() && str3.equals("-SNAPSHOT")) {
            MDSKey build = new MDSKey.Builder().add(new String[]{TYPE_APP_META, str, str2}).build();
            if (((ApplicationMeta) get(build, ApplicationMeta.class)) != null) {
                delete(build);
            }
        }
        write(new MDSKey.Builder().add(new String[]{TYPE_APP_META, str, str2, str3}).build(), new ApplicationMeta(str2, applicationSpecification));
    }

    public void deleteApplication(String str, String str2, String str3) {
        if (!hasUpgraded() && str3.equals("-SNAPSHOT")) {
            MDSKey build = new MDSKey.Builder().add(new String[]{TYPE_APP_META, str, str2}).build();
            if (((ApplicationMeta) get(build, ApplicationMeta.class)) != null) {
                delete(build);
            }
        }
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_APP_META, str, str2, str3}).build());
    }

    public void deleteApplications(String str) {
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_APP_META, str}).build());
    }

    public void updateAppSpec(String str, String str2, String str3, ApplicationSpecification applicationSpecification) {
        LOG.trace("App spec to be updated: id: {}: spec: {}", str2, GSON.toJson(applicationSpecification));
        MDSKey build = new MDSKey.Builder().add(new String[]{TYPE_APP_META, str, str2, str3}).build();
        MDSKey mDSKey = null;
        ApplicationMeta applicationMeta = (ApplicationMeta) getFirst(build, ApplicationMeta.class);
        if (!hasUpgraded() && applicationMeta == null && str3.equals("-SNAPSHOT")) {
            mDSKey = new MDSKey.Builder().add(new String[]{TYPE_APP_META, str, str2}).build();
            applicationMeta = (ApplicationMeta) get(mDSKey, ApplicationMeta.class);
        }
        if (applicationMeta == null) {
            String format = String.format("No meta for namespace %s app %s exists", str, str2);
            LOG.error(format);
            throw new IllegalArgumentException(format);
        }
        ApplicationMeta updateSpec = ApplicationMeta.updateSpec(applicationMeta, applicationSpecification);
        LOG.trace("Application exists in mds: id: {}, spec: {}", applicationMeta);
        if (mDSKey != null) {
            delete(mDSKey);
        }
        write(build, updateSpec);
    }

    public List<WorkflowNodeStateDetail> getWorkflowNodeStates(ProgramRunId programRunId) {
        List<WorkflowNodeStateDetail> list = list(getProgramKeyBuilder(TYPE_WORKFLOW_NODE_STATE, programRunId).build(), WorkflowNodeStateDetail.class);
        if (!hasUpgraded() && list.isEmpty() && programRunId.getVersion().equals("-SNAPSHOT")) {
            list = list(getVersionLessProgramKeyBuilder(TYPE_WORKFLOW_NODE_STATE, programRunId).build(), WorkflowNodeStateDetail.class);
        }
        return list;
    }

    public void addWorkflowNodeState(ProgramRunId programRunId, WorkflowNodeStateDetail workflowNodeStateDetail) {
        write(getProgramKeyBuilder(TYPE_WORKFLOW_NODE_STATE, programRunId).add(workflowNodeStateDetail.getNodeId()).build(), workflowNodeStateDetail);
    }

    private void addWorkflowNodeState(ProgramRunId programRunId, Map<String, String> map, ProgramRunStatus programRunStatus, @Nullable BasicThrowable basicThrowable, byte[] bArr) {
        String str = map.get(ProgramOptionConstants.WORKFLOW_NODE_ID);
        String str2 = map.get(ProgramOptionConstants.WORKFLOW_NAME);
        ProgramRunId run = programRunId.getParent().getParent().workflow(str2).run(map.get(ProgramOptionConstants.WORKFLOW_RUN_ID));
        write(getProgramKeyBuilder(TYPE_WORKFLOW_NODE_STATE, run).add(str).build(), new WorkflowNodeStateDetail(str, ProgramRunStatus.toNodeStatus(programRunStatus), programRunId.getRun(), basicThrowable));
        MDSKey build = getProgramKeyBuilder(TYPE_RUN_RECORD_STARTED, run).build();
        RunRecordMeta runRecordMeta = (RunRecordMeta) get(build, RunRecordMeta.class);
        if (runRecordMeta != null) {
            HashMap hashMap = new HashMap(runRecordMeta.getProperties());
            hashMap.put(str, programRunId.getRun());
            write(build, RunRecordMeta.builder(runRecordMeta).setProperties(hashMap).setSourceId(bArr).build());
        }
    }

    @Nullable
    public RunRecordMeta recordProgramProvisioning(ProgramRunId programRunId, Map<String, String> map, Map<String, String> map2, byte[] bArr, @Nullable ArtifactId artifactId) {
        MDSKey build = getProgramKeyBuilder(TYPE_RUN_RECORD_STARTING, programRunId).build();
        long time = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS);
        if (time == -1) {
            LOG.error("Ignoring unexpected request to record provisioning state for program run {} that does not have a timestamp in the run id.");
            return null;
        }
        RunRecordMeta run = getRun(programRunId);
        if (run != null) {
            LOG.error("Ignoring unexpected request to record provisioning state for program run {} that has an existing run record in run state {} and cluster state {}.", new Object[]{programRunId, run.getStatus(), run.getCluster().getStatus()});
            return null;
        }
        Optional<ProfileId> profileIdFromArgs = SystemArguments.getProfileIdFromArgs(programRunId.getNamespaceId(), map2);
        if (!profileIdFromArgs.isPresent()) {
            LOG.error("Ignoring unexpected request to record provisioning state for program run {} that does not have a profile assigned to it.", programRunId);
            return null;
        }
        RunRecordMeta build2 = RunRecordMeta.builder().setProgramRunId(programRunId).setStartTime(time).setStatus(ProgramRunStatus.PENDING).setProperties(getRecordProperties(map2, map)).setSystemArgs(map2).setCluster(new ProgramRunCluster(ProgramRunClusterStatus.PROVISIONING, (Long) null, (Integer) null)).setProfileId(profileIdFromArgs.get()).setSourceId(bArr).setArtifactId(artifactId).setPrincipal(map2.get(ProgramOptionConstants.PRINCIPAL)).build();
        write(build, build2);
        LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.PROVISIONING, programRunId);
        return build2;
    }

    private Map<String, String> getRecordProperties(Map<String, String> map, Map<String, String> map2) {
        String str = null;
        if (map != null && map.containsKey(ProgramOptionConstants.WORKFLOW_NAME)) {
            str = map.get(ProgramOptionConstants.WORKFLOW_RUN_ID);
        }
        ImmutableMap.Builder builder = ImmutableMap.builder();
        builder.put("runtimeArgs", GSON.toJson(map2, MAP_STRING_STRING_TYPE));
        if (str != null) {
            builder.put("workflowrunid", str);
        }
        return builder.build();
    }

    @Nullable
    public RunRecordMeta recordProgramProvisioned(ProgramRunId programRunId, int i, byte[] bArr) {
        MDSKey build = getProgramKeyBuilder(TYPE_RUN_RECORD_STARTING, programRunId).build();
        RunRecordMeta run = getRun(programRunId);
        if (run == null) {
            LOG.warn("Ignoring unexpected request to transition program run {} from non-existent state to cluster state {}.", programRunId, ProgramRunClusterStatus.PROVISIONED);
            return null;
        }
        if (!isValid(run, run.getStatus(), ProgramRunClusterStatus.PROVISIONED, bArr)) {
            return null;
        }
        RunRecordMeta build2 = RunRecordMeta.builder(run).setCluster(new ProgramRunCluster(ProgramRunClusterStatus.PROVISIONED, (Long) null, Integer.valueOf(i))).setSourceId(bArr).build();
        write(build, build2);
        LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.PROVISIONED, programRunId);
        return build2;
    }

    @Nullable
    public RunRecordMeta recordProgramDeprovisioning(ProgramRunId programRunId, byte[] bArr) {
        MDSKey.Builder programKeyBuilder = getProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programRunId.getParent());
        RunRecordMeta run = getRun(programRunId);
        if (run == null) {
            LOG.debug("Ignoring unexpected transition of program run {} to cluster state {} with no existing run record.", programRunId, ProgramRunClusterStatus.DEPROVISIONING);
            return null;
        }
        if (!isValid(run, run.getStatus(), ProgramRunClusterStatus.DEPROVISIONING, bArr)) {
            return null;
        }
        ProgramRunClusterStatus status = run.getCluster().getStatus();
        delete(run);
        programKeyBuilder.add(getInvertedTsKeyPart(run.getStartTs())).add(programRunId.getRun()).build();
        ProgramRunStatus status2 = run.getStatus();
        ProgramRunStatus status3 = run.getStatus();
        if (status == ProgramRunClusterStatus.PROVISIONING && status2 == ProgramRunStatus.PENDING) {
            status3 = ProgramRunStatus.FAILED;
        }
        RunRecordMeta build = RunRecordMeta.builder(run).setCluster(new ProgramRunCluster(ProgramRunClusterStatus.DEPROVISIONING, (Long) null, run.getCluster().getNumNodes())).setSourceId(bArr).setStatus(status3).build();
        write(programKeyBuilder.build(), build);
        LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.DEPROVISIONING, programRunId);
        return build;
    }

    @Nullable
    public RunRecordMeta recordProgramDeprovisioned(ProgramRunId programRunId, @Nullable Long l, byte[] bArr) {
        MDSKey.Builder programKeyBuilder = getProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programRunId.getParent());
        RunRecordMeta run = getRun(programRunId);
        if (run == null) {
            LOG.debug("Ignoring unexpected transition of program run {} to cluster state {} with no existing run record.", programRunId, ProgramRunClusterStatus.DEPROVISIONED);
            return null;
        }
        if (!isValid(run, run.getStatus(), ProgramRunClusterStatus.DEPROVISIONED, bArr)) {
            return null;
        }
        ProgramRunClusterStatus status = run.getCluster().getStatus();
        delete(run);
        programKeyBuilder.add(getInvertedTsKeyPart(run.getStartTs())).add(programRunId.getRun()).build();
        RunRecordMeta build = RunRecordMeta.builder(run).setCluster(new ProgramRunCluster(ProgramRunClusterStatus.DEPROVISIONED, l, run.getCluster().getNumNodes())).setSourceId(bArr).setStatus(status == ProgramRunClusterStatus.PROVISIONING ? ProgramRunStatus.FAILED : run.getStatus()).build();
        write(programKeyBuilder.build(), build);
        LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.DEPROVISIONED, programRunId);
        return build;
    }

    @Nullable
    public RunRecordMeta recordProgramOrphaned(ProgramRunId programRunId, long j, byte[] bArr) {
        MDSKey.Builder programKeyBuilder = getProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programRunId.getParent());
        RunRecordMeta run = getRun(programRunId);
        if (run == null) {
            LOG.debug("Ignoring unexpected transition of program run {} to cluster state {} with no existing run record.", programRunId, ProgramRunClusterStatus.DEPROVISIONED);
            return null;
        }
        if (!isValid(run, run.getStatus(), ProgramRunClusterStatus.ORPHANED, bArr)) {
            return null;
        }
        ProgramRunClusterStatus status = run.getCluster().getStatus();
        delete(run);
        programKeyBuilder.add(getInvertedTsKeyPart(run.getStartTs())).add(programRunId.getRun()).build();
        RunRecordMeta build = RunRecordMeta.builder(run).setCluster(new ProgramRunCluster(ProgramRunClusterStatus.ORPHANED, Long.valueOf(j), run.getCluster().getNumNodes())).setSourceId(bArr).setStatus(status == ProgramRunClusterStatus.PROVISIONING ? ProgramRunStatus.FAILED : run.getStatus()).build();
        write(programKeyBuilder.build(), build);
        LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.ORPHANED, programRunId);
        return build;
    }

    @Nullable
    public RunRecordMeta recordProgramStart(ProgramRunId programRunId, @Nullable String str, Map<String, String> map, byte[] bArr) {
        MDSKey build = getProgramKeyBuilder(TYPE_RUN_RECORD_STARTING, programRunId).build();
        RunRecordMeta run = getRun(programRunId);
        if (map.containsKey(ProgramOptionConstants.WORKFLOW_NAME)) {
            addWorkflowNodeState(programRunId, map, ProgramRunStatus.STARTING, null, bArr);
        }
        if (run == null) {
            LOG.warn("Ignoring unexpected transition of program run {} to program state {} with no existing run record.", programRunId, ProgramRunStatus.STARTING);
            return null;
        }
        if (!isValid(run, ProgramRunStatus.STARTING, run.getCluster().getStatus(), bArr)) {
            return null;
        }
        delete(run);
        RunRecordMeta build2 = RunRecordMeta.builder(run).setStatus(ProgramRunStatus.STARTING).setTwillRunId(str).setSourceId(bArr).build();
        write(build, build2);
        LOG.trace("Recorded {} for program {}", ProgramRunStatus.STARTING, programRunId);
        return build2;
    }

    public RunRecordMeta recordProgramRunning(ProgramRunId programRunId, long j, String str, byte[] bArr) {
        return recordProgramRunning(programRunId, j, str, getProgramKeyBuilder(TYPE_RUN_RECORD_STARTED, programRunId).build(), bArr);
    }

    @VisibleForTesting
    void recordProgramRunningOldFormat(ProgramRunId programRunId, long j, String str, byte[] bArr) {
        recordProgramRunning(programRunId, j, str, getVersionLessProgramKeyBuilder(TYPE_RUN_RECORD_STARTED, programRunId).build(), bArr);
    }

    @Nullable
    private RunRecordMeta recordProgramRunning(ProgramRunId programRunId, long j, String str, MDSKey mDSKey, byte[] bArr) {
        RunRecordMeta run = getRun(programRunId);
        if (run == null) {
            LOG.warn("Ignoring unexpected transition of program run {} to program state {} with no existing run record.", programRunId, ProgramRunStatus.RUNNING);
            return null;
        }
        if (!isValid(run, ProgramRunStatus.RUNNING, run.getCluster().getStatus(), bArr)) {
            return null;
        }
        Map<String, String> systemArgs = run.getSystemArgs();
        if (systemArgs != null && systemArgs.containsKey(ProgramOptionConstants.WORKFLOW_NAME)) {
            addWorkflowNodeState(programRunId, systemArgs, ProgramRunStatus.RUNNING, null, bArr);
        }
        delete(run);
        RunRecordMeta build = RunRecordMeta.builder(run).setRunTime(Long.valueOf(j)).setStatus(ProgramRunStatus.RUNNING).setTwillRunId(str).setSourceId(bArr).build();
        write(mDSKey, build);
        LOG.trace("Recorded {} for program {}", ProgramRunStatus.RUNNING, programRunId);
        return build;
    }

    @Nullable
    public RunRecordMeta recordProgramSuspend(ProgramRunId programRunId, byte[] bArr, long j) {
        RunRecordMeta run = getRun(programRunId);
        if (run == null) {
            LOG.warn("Ignoring unexpected transition of program run {} to program state {} with no existing run record.", programRunId, ProgramRunStatus.SUSPENDED);
            return null;
        }
        if (isValid(run, ProgramRunStatus.SUSPENDED, run.getCluster().getStatus(), bArr)) {
            return recordProgramSuspendResume(programRunId, bArr, run, "suspend", j);
        }
        return null;
    }

    @Nullable
    public RunRecordMeta recordProgramResumed(ProgramRunId programRunId, byte[] bArr, long j) {
        RunRecordMeta run = getRun(programRunId);
        if (run == null) {
            LOG.warn("Ignoring unexpected transition of program run {} to program state {} with no existing run record.", programRunId, ProgramRunStatus.RUNNING);
            return null;
        }
        if (!isValid(run, ProgramRunStatus.RUNNING, run.getCluster().getStatus(), bArr)) {
            return null;
        }
        if (!hasUpgraded() && programRunId.getVersion().equals("-SNAPSHOT")) {
            run = (RunRecordMeta) get(getVersionLessProgramKeyBuilder(TYPE_RUN_RECORD_SUSPENDED, programRunId.getParent()).add(programRunId.getRun()).build(), RunRecordMeta.class);
        }
        if (run != null) {
            return recordProgramSuspendResume(programRunId, bArr, run, "resume", j);
        }
        LOG.error("No run record meta for program '{}' pid '{}' exists. Skip recording program suspend.", programRunId.getParent(), programRunId.getRun());
        return null;
    }

    private RunRecordMeta recordProgramSuspendResume(ProgramRunId programRunId, byte[] bArr, RunRecordMeta runRecordMeta, String str, long j) {
        String str2 = TYPE_RUN_RECORD_SUSPENDED;
        ProgramRunStatus programRunStatus = ProgramRunStatus.SUSPENDED;
        if (str.equals("resume")) {
            str2 = TYPE_RUN_RECORD_STARTED;
            programRunStatus = ProgramRunStatus.RUNNING;
        }
        delete(runRecordMeta);
        MDSKey build = getProgramKeyBuilder(str2, programRunId).build();
        RunRecordMeta.Builder sourceId = RunRecordMeta.builder(runRecordMeta).setStatus(programRunStatus).setSourceId(bArr);
        if (j != -1) {
            if (str.equals("resume")) {
                sourceId.setResumeTime(Long.valueOf(j));
            } else {
                sourceId.setSuspendTime(Long.valueOf(j));
            }
        }
        RunRecordMeta build2 = sourceId.build();
        write(build, build2);
        LOG.trace("Recorded {} for program {}", programRunStatus, programRunId);
        return build2;
    }

    @Nullable
    public RunRecordMeta recordProgramStop(ProgramRunId programRunId, long j, ProgramRunStatus programRunStatus, @Nullable BasicThrowable basicThrowable, byte[] bArr) {
        return recordProgramStop(programRunId, j, programRunStatus, basicThrowable, getProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programRunId.getParent()), bArr);
    }

    @VisibleForTesting
    void recordProgramStopOldFormat(ProgramRunId programRunId, long j, ProgramRunStatus programRunStatus, @Nullable BasicThrowable basicThrowable, byte[] bArr) {
        recordProgramStop(programRunId, j, programRunStatus, basicThrowable, getVersionLessProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programRunId.getParent()), bArr);
    }

    @Nullable
    private RunRecordMeta recordProgramStop(ProgramRunId programRunId, long j, ProgramRunStatus programRunStatus, @Nullable BasicThrowable basicThrowable, MDSKey.Builder builder, byte[] bArr) {
        RunRecordMeta run = getRun(programRunId);
        if (run == null) {
            LOG.warn("Ignoring unexpected transition of program run {} to program state {} with no existing run record.", programRunId, programRunStatus);
            return null;
        }
        if (!isValid(run, programRunStatus, run.getCluster().getStatus(), bArr)) {
            return null;
        }
        delete(run);
        Map<String, String> systemArgs = run.getSystemArgs();
        if (systemArgs != null && systemArgs.containsKey(ProgramOptionConstants.WORKFLOW_NAME)) {
            addWorkflowNodeState(programRunId, systemArgs, programRunStatus, basicThrowable, bArr);
        }
        MDSKey build = builder.add(getInvertedTsKeyPart(run.getStartTs())).add(programRunId.getRun()).build();
        RunRecordMeta build2 = RunRecordMeta.builder(run).setStopTime(Long.valueOf(j)).setStatus(programRunStatus).setSourceId(bArr).build();
        write(build, build2);
        LOG.trace("Recorded {} for program {}", programRunStatus, programRunId);
        return build2;
    }

    private boolean isValid(RunRecordMeta runRecordMeta, ProgramRunStatus programRunStatus, ProgramRunClusterStatus programRunClusterStatus, byte[] bArr) {
        byte[] sourceId = runRecordMeta.getSourceId();
        if (sourceId != null && Bytes.compareTo(bArr, sourceId) < 0) {
            LOG.debug("Current source id '{}' is not larger than the existing source id '{}' in the existing run record meta '{}'. Skip recording state transition to program state {} and cluster state {}.", new Object[]{Bytes.toHexString(bArr), Bytes.toHexString(sourceId), runRecordMeta, programRunStatus, programRunClusterStatus});
            return false;
        }
        if (runRecordMeta.getStatus() == programRunStatus && runRecordMeta.getCluster().getStatus() == programRunClusterStatus) {
            return false;
        }
        if (!runRecordMeta.getStatus().canTransitionTo(programRunStatus)) {
            LOG.warn("Ignoring unexpected transition of program run {} from run state {} to {}.", new Object[]{runRecordMeta.getProgramRunId(), runRecordMeta.getStatus(), programRunStatus});
            return false;
        }
        if (runRecordMeta.getCluster().getStatus().canTransitionTo(programRunClusterStatus)) {
            return true;
        }
        LOG.warn("Ignoring unexpected transition of program run {} from cluster state {} to {}.", new Object[]{runRecordMeta.getProgramRunId(), runRecordMeta.getCluster().getStatus(), programRunClusterStatus});
        return false;
    }

    public Map<ProgramRunId, RunRecordMeta> getRuns(Set<ProgramRunId> set) {
        return getRuns(set, Integer.MAX_VALUE);
    }

    public Map<ProgramRunId, RunRecordMeta> getActiveRuns(Set<NamespaceId> set, Predicate<RunRecordMeta> predicate) {
        return (Map) set.stream().flatMap(namespaceId -> {
            Map<ProgramRunId, RunRecordMeta> programRunIdMap = getProgramRunIdMap(listKV(getNamespaceKeyBuilder(TYPE_RUN_RECORD_STARTING, namespaceId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, predicate));
            programRunIdMap.putAll(getProgramRunIdMap(listKV(getNamespaceKeyBuilder(TYPE_RUN_RECORD_STARTED, namespaceId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, predicate)));
            programRunIdMap.putAll(getProgramRunIdMap(listKV(getNamespaceKeyBuilder(TYPE_RUN_RECORD_SUSPENDED, namespaceId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, predicate)));
            return programRunIdMap.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    public Map<ProgramRunId, RunRecordMeta> getActiveRuns(Predicate<RunRecordMeta> predicate) {
        Map<ProgramRunId, RunRecordMeta> programRunIdMap = getProgramRunIdMap(listKV(getNamespaceKeyBuilder(TYPE_RUN_RECORD_STARTING, null).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, predicate));
        programRunIdMap.putAll(getProgramRunIdMap(listKV(getNamespaceKeyBuilder(TYPE_RUN_RECORD_STARTED, null).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, predicate)));
        programRunIdMap.putAll(getProgramRunIdMap(listKV(getNamespaceKeyBuilder(TYPE_RUN_RECORD_SUSPENDED, null).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, predicate)));
        return programRunIdMap;
    }

    public Map<ProgramRunId, RunRecordMeta> getActiveRuns(NamespaceId namespaceId) {
        Predicate<RunRecordMeta> timeRangePredicate = getTimeRangePredicate(0L, Long.MAX_VALUE);
        Map<ProgramRunId, RunRecordMeta> programRunIdMap = getProgramRunIdMap(listKV(getNamespaceKeyBuilder(TYPE_RUN_RECORD_STARTING, namespaceId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, timeRangePredicate));
        programRunIdMap.putAll(getProgramRunIdMap(listKV(getNamespaceKeyBuilder(TYPE_RUN_RECORD_STARTED, namespaceId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, timeRangePredicate)));
        programRunIdMap.putAll(getProgramRunIdMap(listKV(getNamespaceKeyBuilder(TYPE_RUN_RECORD_SUSPENDED, namespaceId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, timeRangePredicate)));
        return programRunIdMap;
    }

    public Map<ProgramRunId, RunRecordMeta> getActiveRuns(ApplicationId applicationId) {
        Predicate<RunRecordMeta> timeRangePredicate = getTimeRangePredicate(0L, Long.MAX_VALUE);
        Map<ProgramRunId, RunRecordMeta> programRunIdMap = getProgramRunIdMap(listKV(getApplicationKeyBuilder(TYPE_RUN_RECORD_STARTING, applicationId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, timeRangePredicate));
        programRunIdMap.putAll(getProgramRunIdMap(listKV(getApplicationKeyBuilder(TYPE_RUN_RECORD_STARTED, applicationId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, timeRangePredicate)));
        programRunIdMap.putAll(getProgramRunIdMap(listKV(getApplicationKeyBuilder(TYPE_RUN_RECORD_SUSPENDED, applicationId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, timeRangePredicate)));
        return programRunIdMap;
    }

    public Map<ProgramRunId, RunRecordMeta> getActiveRuns(ProgramId programId) {
        Predicate<RunRecordMeta> timeRangePredicate = getTimeRangePredicate(0L, Long.MAX_VALUE);
        Map<ProgramRunId, RunRecordMeta> programRunIdMap = getProgramRunIdMap(listKV(getProgramKeyBuilder(TYPE_RUN_RECORD_STARTING, programId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, timeRangePredicate));
        programRunIdMap.putAll(getProgramRunIdMap(listKV(getProgramKeyBuilder(TYPE_RUN_RECORD_STARTED, programId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, timeRangePredicate)));
        programRunIdMap.putAll(getProgramRunIdMap(listKV(getProgramKeyBuilder(TYPE_RUN_RECORD_SUSPENDED, programId).build(), null, RunRecordMeta.class, Integer.MAX_VALUE, timeRangePredicate)));
        return programRunIdMap;
    }

    private Map<ProgramRunId, RunRecordMeta> getRuns(Set<ProgramRunId> set, int i) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Iterator it = Arrays.asList(TYPE_RUN_RECORD_STARTING, TYPE_RUN_RECORD_STARTED, TYPE_RUN_RECORD_SUSPENDED, TYPE_RUN_RECORD_COMPLETED).iterator();
        while (it.hasNext()) {
            linkedHashMap.putAll(getRunsForRunIds(set, (String) it.next(), i - linkedHashMap.size()));
        }
        return linkedHashMap;
    }

    public Map<ProgramRunId, RunRecordMeta> getRuns(@Nullable ProgramId programId, ProgramRunStatus programRunStatus, long j, long j2, int i, @Nullable Predicate<RunRecordMeta> predicate) {
        switch (AnonymousClass3.$SwitchMap$co$cask$cdap$proto$ProgramRunStatus[programRunStatus.ordinal()]) {
            case 1:
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                Iterator it = Arrays.asList(TYPE_RUN_RECORD_STARTING, TYPE_RUN_RECORD_STARTED, TYPE_RUN_RECORD_SUSPENDED).iterator();
                while (it.hasNext()) {
                    linkedHashMap.putAll(getNonCompleteRuns(programId, (String) it.next(), j, j2, i - linkedHashMap.size(), predicate));
                }
                linkedHashMap.putAll(getHistoricalRuns(programId, programRunStatus, j, j2, i - linkedHashMap.size(), predicate));
                return linkedHashMap;
            case 2:
            case 3:
                Predicate<RunRecordMeta> predicate2 = runRecordMeta -> {
                    return runRecordMeta.getStatus() == programRunStatus;
                };
                if (predicate != null) {
                    predicate2 = predicate2.and(predicate);
                }
                return getNonCompleteRuns(programId, TYPE_RUN_RECORD_STARTING, j, j2, i, predicate2);
            case 4:
                return getNonCompleteRuns(programId, TYPE_RUN_RECORD_STARTED, j, j2, i, predicate);
            case 5:
                return getNonCompleteRuns(programId, TYPE_RUN_RECORD_SUSPENDED, j, j2, i, predicate);
            default:
                return getHistoricalRuns(programId, programRunStatus, j, j2, i, predicate);
        }
    }

    public RunRecordMeta getRun(ProgramRunId programRunId) {
        RunRecordMeta unfinishedRun = getUnfinishedRun(programRunId, TYPE_RUN_RECORD_STARTED);
        if (unfinishedRun != null) {
            return unfinishedRun;
        }
        RunRecordMeta unfinishedRun2 = getUnfinishedRun(programRunId, TYPE_RUN_RECORD_STARTING);
        if (unfinishedRun2 != null) {
            return unfinishedRun2;
        }
        RunRecordMeta completedRun = getCompletedRun(programRunId);
        return completedRun != null ? completedRun : getUnfinishedRun(programRunId, TYPE_RUN_RECORD_SUSPENDED);
    }

    private void delete(RunRecordMeta runRecordMeta) {
        ProgramRunId programRunId = runRecordMeta.getProgramRunId();
        MDSKey.Builder programKeyBuilder = getProgramKeyBuilder(STATUS_TYPE_MAP.get(runRecordMeta.getStatus()), programRunId.getParent());
        if (runRecordMeta.getStatus().isEndState()) {
            programKeyBuilder.add(getInvertedTsKeyPart(runRecordMeta.getStartTs()));
        }
        programKeyBuilder.add(programRunId.getRun());
        deleteAll(programKeyBuilder.build());
    }

    private RunRecordMeta getUnfinishedRun(ProgramRunId programRunId, String str) {
        RunRecordMeta runRecordMeta = (RunRecordMeta) get(getProgramKeyBuilder(str, programRunId.getParent()).add(programRunId.getRun()).build(), RunRecordMeta.class);
        return (!hasUpgraded() && runRecordMeta == null && programRunId.getVersion().equals("-SNAPSHOT")) ? (RunRecordMeta) get(getVersionLessProgramKeyBuilder(str, programRunId.getParent()).add(programRunId.getRun()).build(), RunRecordMeta.class) : runRecordMeta;
    }

    private RunRecordMeta getCompletedRun(ProgramRunId programRunId) {
        RunRecordMeta completedRun = getCompletedRun(getProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programRunId.getParent()).build(), programRunId.getRun());
        return (!hasUpgraded() && completedRun == null && programRunId.getVersion().equals("-SNAPSHOT")) ? getCompletedRun(getVersionLessProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programRunId.getParent()).build(), programRunId.getRun()) : completedRun;
    }

    private RunRecordMeta getCompletedRun(MDSKey mDSKey, String str) {
        long time = RunIds.getTime(RunIds.fromString(str), TimeUnit.SECONDS);
        return time > -1 ? (RunRecordMeta) get(new MDSKey.Builder(mDSKey).add(getInvertedTsKeyPart(time)).add(str).build(), RunRecordMeta.class) : (RunRecordMeta) Iterables.getFirst(list(new MDSKey.Builder(mDSKey).add(getInvertedTsScanKeyPart(Long.MAX_VALUE)).build(), new MDSKey.Builder(mDSKey).add(getInvertedTsScanKeyPart(0L)).build(), RunRecordMeta.class, 1, runRecordMeta -> {
            return runRecordMeta.getPid().equals(str);
        }), (Object) null);
    }

    private Map<ProgramRunId, RunRecordMeta> getNonCompleteRuns(@Nullable ProgramId programId, String str, long j, long j2, int i, Predicate<RunRecordMeta> predicate) {
        Predicate<RunRecordMeta> andPredicate = andPredicate(getTimeRangePredicate(j, j2), predicate);
        if (programId == null || !programId.getVersion().equals("-SNAPSHOT")) {
            return getProgramRunIdMap(listKV(getProgramKeyBuilder(str, programId).build(), null, RunRecordMeta.class, i, andPredicate));
        }
        AppVersionPredicate appVersionPredicate = new AppVersionPredicate("-SNAPSHOT");
        Map listKV = listKV(getProgramKeyBuilder(str, programId).build(), null, RunRecordMeta.class, i, appVersionPredicate, andPredicate);
        int size = i - listKV.size();
        if (size > 0 && !hasUpgraded()) {
            listKV.putAll(listKV(getVersionLessProgramKeyBuilder(str, programId).build(), null, RunRecordMeta.class, size, appVersionPredicate, andPredicate));
        }
        return getProgramRunIdMap(listKV);
    }

    private Map<ProgramRunId, RunRecordMeta> getRunsForRunIds(Set<ProgramRunId> set, String str, int i) {
        HashSet hashSet = new HashSet();
        boolean z = !hasUpgraded();
        for (ProgramRunId programRunId : set) {
            hashSet.add(getProgramKeyBuilder(str, programRunId.getParent()).build());
            if (z && programRunId.getVersion().equals("-SNAPSHOT")) {
                hashSet.add(getVersionLessProgramKeyBuilder(str, programRunId.getParent()).build());
            }
        }
        return getProgramRunIdMap(listKV(hashSet, RunRecordMeta.class, i, keyValue -> {
            return set.contains(getProgramID(keyValue.getKey()).run(((RunRecordMeta) keyValue.getValue()).getPid()));
        }));
    }

    private Map<ProgramRunId, RunRecordMeta> getProgramRunIdMap(Map<MDSKey, RunRecordMeta> map) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<MDSKey, RunRecordMeta> entry : map.entrySet()) {
            linkedHashMap.put(getProgramID(entry.getKey()).run(entry.getValue().getPid()), entry.getValue());
        }
        return linkedHashMap;
    }

    private Map<ProgramRunId, RunRecordMeta> getHistoricalRuns(@Nullable ProgramId programId, ProgramRunStatus programRunStatus, long j, long j2, int i, @Nullable Predicate<RunRecordMeta> predicate) {
        if (programId == null || !programId.getVersion().equals("-SNAPSHOT")) {
            return getHistoricalRuns(getProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programId).build(), programRunStatus, j, j2, i, null, predicate);
        }
        AppVersionPredicate appVersionPredicate = new AppVersionPredicate("-SNAPSHOT");
        Map<ProgramRunId, RunRecordMeta> historicalRuns = getHistoricalRuns(getProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programId).build(), programRunStatus, j, j2, i, appVersionPredicate, predicate);
        int size = i - historicalRuns.size();
        if (size > 0 && !hasUpgraded()) {
            historicalRuns.putAll(getHistoricalRuns(getVersionLessProgramKeyBuilder(TYPE_RUN_RECORD_COMPLETED, programId).build(), programRunStatus, j, j2, size, appVersionPredicate, predicate));
        }
        return historicalRuns;
    }

    public Map<ProgramRunId, RunRecordMeta> getHistoricalRuns(Set<NamespaceId> set, long j, long j2, int i) {
        MDSKey build = new MDSKey.Builder().add(TYPE_RUN_RECORD_COMPLETED).build();
        return (Map) set.stream().flatMap(namespaceId -> {
            return getProgramRunIdMap(listKV(new MDSKey.Builder(build).add(namespaceId.getNamespace()).build(), null, RunRecordMeta.class, i, mDSKey -> {
                return true;
            }, runRecordMeta -> {
                return runRecordMeta.getStopTs() != null && runRecordMeta.getStopTs().longValue() >= j && runRecordMeta.getStartTs() < j2;
            })).entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private Map<ProgramRunId, RunRecordMeta> getHistoricalRuns(MDSKey mDSKey, ProgramRunStatus programRunStatus, long j, long j2, int i, @Nullable Predicate<MDSKey> predicate, @Nullable Predicate<RunRecordMeta> predicate2) {
        MDSKey build = new MDSKey.Builder(mDSKey).add(getInvertedTsScanKeyPart(j2)).build();
        MDSKey build2 = new MDSKey.Builder(mDSKey).add(getInvertedTsScanKeyPart(j)).build();
        if (programRunStatus.equals(ProgramRunStatus.ALL)) {
            return getProgramRunIdMap(listKV(build, build2, RunRecordMeta.class, i, predicate, predicate2 == null ? runRecordMeta -> {
                return true;
            } : predicate2));
        }
        return programRunStatus.equals(ProgramRunStatus.COMPLETED) ? getProgramRunIdMap(listKV(build, build2, RunRecordMeta.class, i, predicate, andPredicate(getPredicate(ProgramController.State.COMPLETED), predicate2))) : programRunStatus.equals(ProgramRunStatus.KILLED) ? getProgramRunIdMap(listKV(build, build2, RunRecordMeta.class, i, predicate, andPredicate(getPredicate(ProgramController.State.KILLED), predicate2))) : getProgramRunIdMap(listKV(build, build2, RunRecordMeta.class, i, predicate, andPredicate(getPredicate(ProgramController.State.ERROR), predicate2)));
    }

    private Predicate<RunRecordMeta> getPredicate(ProgramController.State state) {
        return runRecordMeta -> {
            return runRecordMeta.getStatus().equals(state.getRunStatus());
        };
    }

    private Predicate<RunRecordMeta> getTimeRangePredicate(long j, long j2) {
        return runRecordMeta -> {
            return runRecordMeta.getStartTs() >= j && runRecordMeta.getStartTs() < j2;
        };
    }

    private Predicate<RunRecordMeta> andPredicate(Predicate<RunRecordMeta> predicate, @Nullable Predicate<RunRecordMeta> predicate2) {
        return predicate2 != null ? predicate.and(predicate2) : predicate;
    }

    private long getInvertedTsKeyPart(long j) {
        return Long.MAX_VALUE - j;
    }

    private long getInvertedTsScanKeyPart(long j) {
        long invertedTsKeyPart = getInvertedTsKeyPart(j);
        return invertedTsKeyPart < Long.MAX_VALUE ? invertedTsKeyPart + 1 : invertedTsKeyPart;
    }

    public void writeStream(String str, StreamSpecification streamSpecification) {
        write(new MDSKey.Builder().add(new String[]{TYPE_STREAM, str, streamSpecification.getName()}).build(), streamSpecification);
    }

    public StreamSpecification getStream(String str, String str2) {
        return (StreamSpecification) getFirst(new MDSKey.Builder().add(new String[]{TYPE_STREAM, str, str2}).build(), StreamSpecification.class);
    }

    public List<StreamSpecification> getAllStreams(String str) {
        return list(new MDSKey.Builder().add(new String[]{TYPE_STREAM, str}).build(), StreamSpecification.class);
    }

    public void deleteAllStreams(String str) {
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_STREAM, str}).build());
    }

    public void deleteProgramHistory(String str, String str2, String str3) {
        if (hasUpgraded() || !str3.equals("-SNAPSHOT")) {
            deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_STARTING, str, str2, str3}).build());
            deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_STARTED, str, str2, str3}).build());
            deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_COMPLETED, str, str2, str3}).build());
            deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_SUSPENDED, str, str2, str3}).build());
            return;
        }
        AppVersionPredicate appVersionPredicate = new AppVersionPredicate("-SNAPSHOT");
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_STARTING, str, str2}).build(), appVersionPredicate);
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_STARTED, str, str2}).build(), appVersionPredicate);
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_COMPLETED, str, str2}).build(), appVersionPredicate);
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_SUSPENDED, str, str2}).build(), appVersionPredicate);
    }

    public void deleteProgramHistory(String str) {
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_STARTING, str}).build());
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_STARTED, str}).build());
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_COMPLETED, str}).build());
        deleteAll(new MDSKey.Builder().add(new String[]{TYPE_RUN_RECORD_SUSPENDED, str}).build());
    }

    public void createNamespace(NamespaceMeta namespaceMeta) {
        write(getNamespaceKey(namespaceMeta.getName()), namespaceMeta);
    }

    private MDSKey getNamespaceKey(@Nullable String str) {
        MDSKey.Builder add = new MDSKey.Builder().add(TYPE_NAMESPACE);
        if (null != str) {
            add.add(str);
        }
        return add.build();
    }

    public void setWorkflowToken(ProgramRunId programRunId, WorkflowToken workflowToken) {
        if (programRunId.getType() != ProgramType.WORKFLOW) {
            throw new IllegalArgumentException("WorkflowToken can only be set for workflow execution: " + programRunId);
        }
        write(getProgramKeyBuilder(TYPE_WORKFLOW_TOKEN, programRunId).build(), workflowToken);
    }

    public WorkflowToken getWorkflowToken(ProgramId programId, String str) {
        Preconditions.checkArgument(ProgramType.WORKFLOW == programId.getType());
        BasicWorkflowToken basicWorkflowToken = (BasicWorkflowToken) get(getProgramKeyBuilder(TYPE_WORKFLOW_TOKEN, programId.run(str)).build(), BasicWorkflowToken.class);
        if (!hasUpgraded() && basicWorkflowToken == null && programId.getVersion().equals("-SNAPSHOT")) {
            basicWorkflowToken = (BasicWorkflowToken) get(getVersionLessProgramKeyBuilder(TYPE_WORKFLOW_TOKEN, programId).add(str).build(), BasicWorkflowToken.class);
        }
        if (basicWorkflowToken != null) {
            return basicWorkflowToken;
        }
        LOG.debug("No workflow token available for workflow: {}, runId: {}", programId, str);
        return new BasicWorkflowToken(0);
    }

    public Set<RunId> getRunningInRange(long j, long j2) {
        long millis = TimeUnit.SECONDS.toMillis(this.cConf.getLong("data.tx.timeout")) / 2;
        LOG.trace("Scan timeout = {}ms", Long.valueOf(millis));
        HashSet hashSet = new HashSet();
        Iterables.addAll(hashSet, getRunningInRangeForStatus(TYPE_RUN_RECORD_COMPLETED, j, j2, millis));
        Iterables.addAll(hashSet, getRunningInRangeForStatus(TYPE_RUN_RECORD_SUSPENDED, j, j2, millis));
        Iterables.addAll(hashSet, getRunningInRangeForStatus(TYPE_RUN_RECORD_STARTED, j, j2, millis));
        Iterables.addAll(hashSet, getRunningInRangeForStatus(TYPE_RUN_RECORD_STARTING, j, j2, millis));
        return hashSet;
    }

    public boolean hasUpgraded() {
        if (upgradeCompleted) {
            return true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - lastUpgradeCompletedCheck < UPGRADE_COMPLETED_CHECK_INTERVAL) {
            return false;
        }
        synchronized (AppMetadataStore.class) {
            if (upgradeCompleted) {
                return true;
            }
            lastUpgradeCompletedCheck = currentTimeMillis;
            MDSKey.Builder builder = new MDSKey.Builder();
            builder.add(APP_VERSION_UPGRADE_KEY);
            String str = (String) get(builder.build(), String.class);
            if (str == null) {
                return false;
            }
            boolean z = new ProjectInfo.Version(str).compareTo(ProjectInfo.getVersion()) >= 0;
            boolean z2 = z;
            upgradeCompleted = z;
            return z2;
        }
    }

    public void upgradeCompleted() {
        MDSKey.Builder builder = new MDSKey.Builder();
        builder.add(APP_VERSION_UPGRADE_KEY);
        write(builder.build(), ProjectInfo.getVersion().toString());
        synchronized (AppMetadataStore.class) {
            lastUpgradeCompletedCheck = 0L;
        }
    }

    @Nullable
    public String retrieveSubscriberState(String str, String str2) {
        MDSKey.Builder add = new MDSKey.Builder().add(TYPE_MESSAGE).add(str);
        if (str2 != null && !str2.isEmpty()) {
            add.add(str2);
        }
        byte[] bArr = (byte[]) get(add.build(), BYTE_TYPE);
        if (bArr == null) {
            return null;
        }
        return Bytes.toString(bArr);
    }

    public void persistSubscriberState(String str, String str2, String str3) {
        MDSKey.Builder add = new MDSKey.Builder().add(TYPE_MESSAGE).add(str);
        if (str2 != null && !str2.isEmpty()) {
            add.add(str2);
        }
        write(add.build(), Bytes.toBytes(str3));
    }

    public void deleteSubscriberState(String str, String str2) {
        MDSKey.Builder add = new MDSKey.Builder().add(TYPE_MESSAGE).add(str);
        add.add(str2);
        delete(add.build());
    }

    private Iterable<RunId> getRunningInRangeForStatus(String str, long j, long j2, long j3) {
        return Iterables.concat(getRunningInRangeForStatus(str, j, j2, j3, Ticker.systemTicker()));
    }

    @VisibleForTesting
    List<Iterable<RunId>> getRunningInRangeForStatus(String str, long j, long j2, long j3, Ticker ticker) {
        Predicate predicate = runRecordMeta -> {
            return runRecordMeta.getStartTs() < j2 && (runRecordMeta.getStopTs() == null || runRecordMeta.getStopTs().longValue() >= j);
        };
        ArrayList arrayList = new ArrayList();
        MDSKey build = new MDSKey.Builder().add(str).build();
        MDSKey mDSKey = new MDSKey(Bytes.stopKeyForPrefix(build.getKey()));
        while (true) {
            ScanFunction scanFunction = new ScanFunction(predicate, ticker, j3);
            scanFunction.start();
            scan(build, mDSKey, RunRecordMeta.class, scanFunction);
            if (scanFunction.getNumProcessed() == 0) {
                return arrayList;
            }
            arrayList.add(Iterables.transform(scanFunction.getValues(), runRecordMeta2 -> {
                return RunIds.fromString(runRecordMeta2.getPid());
            }));
            build = new MDSKey(Bytes.stopKeyForPrefix(scanFunction.getLastKey().getKey()));
        }
    }

    boolean upgradeVersionKeys(int i) {
        return upgradeVersionKeys(TYPE_APP_META, ApplicationMeta.class, i) & upgradeVersionKeys(TYPE_RUN_RECORD_COMPLETED, RunRecordMeta.class, i) & upgradeVersionKeys(TYPE_WORKFLOW_NODE_STATE, WorkflowNodeStateDetail.class, i) & upgradeVersionKeys(TYPE_WORKFLOW_TOKEN, BasicWorkflowToken.class, i);
    }

    private <T> boolean upgradeVersionKeys(String str, Type type, int i) {
        LOG.info("Checking upgrade for {}", str);
        Map listKV = listKV(new MDSKey.Builder().add(str).build(), type);
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (Map.Entry entry : listKV.entrySet()) {
            MDSKey mDSKey = (MDSKey) entry.getKey();
            MDSKey appendDefaultVersion = appendDefaultVersion(str, mDSKey);
            if (!appendDefaultVersion.equals(mDSKey)) {
                hashSet.add(mDSKey);
                if (get(appendDefaultVersion, type) == null) {
                    hashMap.put(appendDefaultVersion, entry.getValue());
                }
                if (hashSet.size() >= i) {
                    break;
                }
            }
        }
        if (hashSet.size() == 0) {
            return true;
        }
        LOG.info("Upgrading {} entries, deleting {} entries of {}", new Object[]{Integer.valueOf(hashMap.size()), Integer.valueOf(hashSet.size()), str});
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            delete((MDSKey) it.next());
        }
        for (Map.Entry entry2 : hashMap.entrySet()) {
            write((MDSKey) entry2.getKey(), entry2.getValue());
        }
        return false;
    }

    private static MDSKey getUpgradedAppMetaKey(MDSKey mDSKey) {
        String str;
        MDSKey.Splitter split = mDSKey.split();
        String string = split.getString();
        String string2 = split.getString();
        String string3 = split.getString();
        try {
            str = split.getString();
        } catch (BufferUnderflowException e) {
            str = "-SNAPSHOT";
        }
        LOG.trace("Upgrade AppMeta key to {}.{}.{}.{}", new Object[]{string, string2, string3, str});
        return new MDSKey.Builder().add(string).add(string2).add(string3).add(str).build();
    }

    private static MDSKey getUpgradedCompletedRunRecordKey(MDSKey mDSKey) {
        MDSKey.Splitter split = mDSKey.split();
        String string = split.getString();
        String string2 = split.getString();
        String string3 = split.getString();
        String string4 = split.getString();
        try {
            ProgramType.valueOf(string4);
            String string5 = split.getString();
            long j = split.getLong();
            String string6 = split.getString();
            LOG.trace("Upgrade completed RunRecord key to {}.{}.{}.{}.{}.{}.{}.{}", new Object[]{string, string2, string3, "-SNAPSHOT", string4, string5, Long.valueOf(j), string6});
            return new MDSKey.Builder().add(string).add(string2).add(string3).add("-SNAPSHOT").add(string4).add(string5).add(j).add(string6).build();
        } catch (IllegalArgumentException e) {
            LOG.trace("No need to upgrade completed RunRecord key starting with {}.{}.{}.{}.{}", new Object[]{string, string2, string3, "-SNAPSHOT", string4});
            return mDSKey;
        }
    }

    private static MDSKey.Builder getUpgradedWorkflowRunKey(MDSKey.Splitter splitter) {
        String str;
        String string;
        String string2 = splitter.getString();
        String string3 = splitter.getString();
        String string4 = splitter.getString();
        String string5 = splitter.getString();
        if ("WORKFLOW".equals(string5)) {
            str = "-SNAPSHOT";
            string = string5;
        } else {
            LOG.trace("App version exists in workflow run id starting with {}.{}.{}.{}", new Object[]{string2, string3, string4, string5});
            str = string5;
            string = splitter.getString();
        }
        String string6 = splitter.getString();
        String string7 = splitter.getString();
        LOG.trace("Upgrade workflow run id to {}.{}.{}.{}.{}.{}.{}", new Object[]{string2, string3, string4, "-SNAPSHOT", string, string6, string7});
        return new MDSKey.Builder().add(string2).add(string3).add(string4).add(str).add(string).add(string6).add(string7);
    }

    private static MDSKey getUpgradedWorkflowNodeStateRecordKey(MDSKey mDSKey) {
        MDSKey.Splitter split = mDSKey.split();
        MDSKey.Builder upgradedWorkflowRunKey = getUpgradedWorkflowRunKey(split);
        String string = split.getString();
        LOG.trace("Upgrade workflow node state record with WorkflowNodeId {}", string);
        return upgradedWorkflowRunKey.add(string).build();
    }

    private static MDSKey getUpgradedWorkflowTokenRecordKey(MDSKey mDSKey) {
        return getUpgradedWorkflowRunKey(mDSKey.split()).build();
    }

    private static MDSKey appendDefaultVersion(String str, MDSKey mDSKey) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -1457771793:
                if (str.equals(TYPE_RUN_RECORD_COMPLETED)) {
                    z = true;
                    break;
                }
                break;
            case -794162234:
                if (str.equals(TYPE_APP_META)) {
                    z = false;
                    break;
                }
                break;
            case 117637:
                if (str.equals(TYPE_WORKFLOW_TOKEN)) {
                    z = 3;
                    break;
                }
                break;
            case 117884:
                if (str.equals(TYPE_WORKFLOW_NODE_STATE)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return getUpgradedAppMetaKey(mDSKey);
            case true:
                return getUpgradedCompletedRunRecordKey(mDSKey);
            case true:
                return getUpgradedWorkflowNodeStateRecordKey(mDSKey);
            case true:
                return getUpgradedWorkflowTokenRecordKey(mDSKey);
            default:
                throw new IllegalArgumentException(String.format("Invalid row key type '%s'", str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ProgramId getProgramID(MDSKey mDSKey) {
        String str;
        String str2;
        MDSKey.Splitter split = mDSKey.split();
        String string = split.getString();
        String string2 = split.getString();
        String string3 = split.getString();
        String str3 = "-SNAPSHOT";
        ArrayList arrayList = new ArrayList();
        while (true) {
            try {
                arrayList.add(split.getString());
            } catch (BufferUnderflowException e) {
                if (arrayList.size() == (string.equals(TYPE_RUN_RECORD_COMPLETED) ? 2 : 3)) {
                    str = (String) arrayList.get(0);
                    str2 = (String) arrayList.get(1);
                } else {
                    str3 = (String) arrayList.get(0);
                    str = (String) arrayList.get(1);
                    str2 = (String) arrayList.get(2);
                }
                return new ApplicationId(string2, string3, str3).program(ProgramType.valueOf(str), str2);
            }
        }
    }
}
