package co.cask.cdap.internal.app.services;

import co.cask.cdap.api.ProgramSpecification;
import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.retry.RetryableException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService;
import co.cask.cdap.internal.app.store.AppMetadataStore;
import co.cask.cdap.internal.app.store.ApplicationMeta;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.proto.BasicThrowable;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import com.google.inject.Inject;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/services/ProgramNotificationSubscriberService.class */
public class ProgramNotificationSubscriberService extends AbstractNotificationSubscriberService {
    private static final Logger LOG = LoggerFactory.getLogger(ProgramNotificationSubscriberService.class);
    private static final Gson GSON = new Gson();
    private static final Type STRING_STRING_MAP = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.internal.app.services.ProgramNotificationSubscriberService.1
    }.getType();
    private static final DatasetId APP_META_INSTANCE_ID = NamespaceId.SYSTEM.dataset("app.meta");
    private static final byte[] APP_VERSION_UPGRADE_KEY = Bytes.toBytes("version.default.store");
    private final CConfiguration cConf;
    private final AtomicBoolean upgradeComplete;
    private final DatasetFramework datasetFramework;
    private final String recordedProgramStatusPublishTopic;
    private ExecutorService taskExecutorService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: co.cask.cdap.internal.app.services.ProgramNotificationSubscriberService$2, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/internal/app/services/ProgramNotificationSubscriberService$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$proto$ProgramRunStatus = new int[ProgramRunStatus.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.STARTING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.SUSPENDED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.RESUMING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.COMPLETED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.KILLED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramRunStatus[ProgramRunStatus.FAILED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/services/ProgramNotificationSubscriberService$ProgramStatusSubscriberRunnable.class */
    private class ProgramStatusSubscriberRunnable extends AbstractNotificationSubscriberService.AbstractSubscriberRunnable {
        private final String topic;

        ProgramStatusSubscriberRunnable(String str) {
            super("program.status", str, ProgramNotificationSubscriberService.this.cConf.getLong("app.program.status.event.poll.delay.millis"), ProgramNotificationSubscriberService.this.cConf.getInt("app.program.status.event.fetch.size"), false);
            this.topic = str;
        }

        @Override // co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService.AbstractSubscriberRunnable
        @Nullable
        protected String initialize(DatasetContext datasetContext) throws RetryableException {
            return getAppMetadataStore(datasetContext).retrieveSubscriberState(this.topic);
        }

        @Override // co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService.AbstractSubscriberRunnable
        public void persistMessageId(DatasetContext datasetContext, String str) {
            getAppMetadataStore(datasetContext).persistSubscriberState(this.topic, str);
        }

        @Override // co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService.AbstractSubscriberRunnable
        protected void processNotifications(DatasetContext datasetContext, AbstractNotificationSubscriberService.NotificationIterator notificationIterator) throws Exception {
            AppMetadataStore appMetadataStore = getAppMetadataStore(datasetContext);
            while (notificationIterator.hasNext()) {
                processNotification(appMetadataStore, (Notification) notificationIterator.next(), notificationIterator.getLastMessageId().getBytes());
            }
        }

        private void processNotification(AppMetadataStore appMetadataStore, Notification notification, byte[] bArr) throws Exception {
            ProgramRunStatus recordProgramStop;
            Map properties = notification.getProperties();
            String str = (String) properties.get(ProgramOptionConstants.PROGRAM_RUN_ID);
            String str2 = (String) properties.get(ProgramOptionConstants.PROGRAM_STATUS);
            if (str == null || str2 == null) {
                ProgramNotificationSubscriberService.LOG.warn("Ignore notification that misses program run state information, {}", notification);
                return;
            }
            try {
                ProgramRunStatus valueOf = ProgramRunStatus.valueOf(str2);
                ProgramRunId programRunId = (ProgramRunId) ProgramNotificationSubscriberService.GSON.fromJson(str, ProgramRunId.class);
                ProgramId parent = programRunId.getParent();
                ApplicationMeta application = appMetadataStore.getApplication(programRunId.getNamespace(), programRunId.getApplication(), programRunId.getVersion());
                if (application == null) {
                    ProgramNotificationSubscriberService.LOG.warn("Ignore notification for application {} that doesn't exist, {}", programRunId, notification);
                    return;
                }
                if (getProgramSpecFromApp(application.getSpec(), programRunId) == null) {
                    ProgramNotificationSubscriberService.LOG.warn("Ignore notification for program {} that doesn't exist, {}", programRunId, notification);
                    return;
                }
                ProgramNotificationSubscriberService.LOG.trace("Processing program status notification: {}", notification);
                String run = programRunId.getRun();
                String str3 = (String) notification.getProperties().get(ProgramOptionConstants.TWILL_RUN_ID);
                long timeSeconds = getTimeSeconds(notification.getProperties(), ProgramOptionConstants.END_TIME);
                switch (AnonymousClass2.$SwitchMap$co$cask$cdap$proto$ProgramRunStatus[valueOf.ordinal()]) {
                    case 1:
                        long timeSeconds2 = getTimeSeconds(notification.getProperties(), ProgramOptionConstants.START_TIME);
                        String str4 = (String) properties.get(ProgramOptionConstants.USER_OVERRIDES);
                        String str5 = (String) properties.get(ProgramOptionConstants.SYSTEM_OVERRIDES);
                        if (str4 != null && str5 != null) {
                            if (timeSeconds2 != -1) {
                                recordProgramStop = appMetadataStore.recordProgramStart(parent, run, timeSeconds2, str3, (Map) ProgramNotificationSubscriberService.GSON.fromJson(str4, ProgramNotificationSubscriberService.STRING_STRING_MAP), (Map) ProgramNotificationSubscriberService.GSON.fromJson(str5, ProgramNotificationSubscriberService.STRING_STRING_MAP), bArr);
                                break;
                            } else {
                                ProgramNotificationSubscriberService.LOG.warn("Ignore program starting notification for program {} without start time, {}", programRunId, notification);
                                return;
                            }
                        } else {
                            Logger logger = ProgramNotificationSubscriberService.LOG;
                            Object[] objArr = new Object[3];
                            objArr[0] = programRunId;
                            objArr[1] = str4 == null ? "user" : "system";
                            objArr[2] = notification;
                            logger.warn("Ignore program starting notification for program {} without {} arguments, {}", objArr);
                            return;
                        }
                        break;
                    case 2:
                        long timeSeconds3 = getTimeSeconds(notification.getProperties(), ProgramOptionConstants.LOGICAL_START_TIME);
                        if (timeSeconds3 != -1) {
                            recordProgramStop = appMetadataStore.recordProgramRunning(parent, run, timeSeconds3, str3, bArr);
                            break;
                        } else {
                            ProgramNotificationSubscriberService.LOG.warn("Ignore program running notification for program {} without {} specified, {}", new Object[]{programRunId, ProgramOptionConstants.LOGICAL_START_TIME, notification});
                            return;
                        }
                    case 3:
                        recordProgramStop = appMetadataStore.recordProgramSuspend(parent, run, bArr);
                        break;
                    case 4:
                        recordProgramStop = appMetadataStore.recordProgramResumed(parent, run, bArr);
                        break;
                    case 5:
                    case 6:
                        if (timeSeconds != -1) {
                            recordProgramStop = appMetadataStore.recordProgramStop(parent, run, timeSeconds, valueOf, null, bArr);
                            break;
                        } else {
                            ProgramNotificationSubscriberService.LOG.warn("Ignore program killed notification for program {} without end time specified, {}", programRunId, notification);
                            return;
                        }
                    case 7:
                        if (timeSeconds != -1) {
                            recordProgramStop = appMetadataStore.recordProgramStop(parent, run, timeSeconds, valueOf, decodeBasicThrowable((String) properties.get(ProgramOptionConstants.PROGRAM_ERROR)), bArr);
                            break;
                        } else {
                            ProgramNotificationSubscriberService.LOG.warn("Ignore program failed notification for program {} without end time specified, {}", programRunId, notification);
                            return;
                        }
                    default:
                        ProgramNotificationSubscriberService.LOG.error("Unsupported program status {} for program {}, {}", new Object[]{valueOf, programRunId, notification});
                        return;
                }
                if (recordProgramStop != null) {
                    publishRecordedStatus(programRunId, recordProgramStop);
                }
            } catch (IllegalArgumentException e) {
                ProgramNotificationSubscriberService.LOG.warn("Ignore notification with invalid program run status {} for program {}, {}", new Object[]{str2, str, notification});
            }
        }

        private void publishRecordedStatus(ProgramRunId programRunId, ProgramRunStatus programRunStatus) throws Exception {
            ProgramNotificationSubscriberService.this.getMessagingContext().getMessagePublisher().publish(NamespaceId.SYSTEM.getNamespace(), ProgramNotificationSubscriberService.this.recordedProgramStatusPublishTopic, new String[]{ProgramNotificationSubscriberService.GSON.toJson(new Notification(Notification.Type.PROGRAM_STATUS, ImmutableMap.of(ProgramOptionConstants.PROGRAM_RUN_ID, ProgramNotificationSubscriberService.GSON.toJson(programRunId), ProgramOptionConstants.PROGRAM_STATUS, programRunStatus.name())))});
        }

        @Nullable
        private ProgramSpecification getProgramSpecFromApp(ApplicationSpecification applicationSpecification, ProgramRunId programRunId) {
            String program = programRunId.getProgram();
            ProgramType type = programRunId.getType();
            if (type == ProgramType.FLOW && applicationSpecification.getFlows().containsKey(program)) {
                return (ProgramSpecification) applicationSpecification.getFlows().get(program);
            }
            if (type == ProgramType.MAPREDUCE && applicationSpecification.getMapReduce().containsKey(program)) {
                return (ProgramSpecification) applicationSpecification.getMapReduce().get(program);
            }
            if (type == ProgramType.SPARK && applicationSpecification.getSpark().containsKey(program)) {
                return (ProgramSpecification) applicationSpecification.getSpark().get(program);
            }
            if (type == ProgramType.WORKFLOW && applicationSpecification.getWorkflows().containsKey(program)) {
                return (ProgramSpecification) applicationSpecification.getWorkflows().get(program);
            }
            if (type == ProgramType.SERVICE && applicationSpecification.getServices().containsKey(program)) {
                return (ProgramSpecification) applicationSpecification.getServices().get(program);
            }
            if (type == ProgramType.WORKER && applicationSpecification.getWorkers().containsKey(program)) {
                return (ProgramSpecification) applicationSpecification.getWorkers().get(program);
            }
            return null;
        }

        private long getTimeSeconds(Map<String, String> map, String str) {
            String str2 = map.get(str);
            if (str2 == null) {
                return -1L;
            }
            return TimeUnit.MILLISECONDS.toSeconds(Long.valueOf(str2).longValue());
        }

        @Nullable
        private BasicThrowable decodeBasicThrowable(@Nullable String str) {
            if (str == null) {
                return null;
            }
            try {
                return (BasicThrowable) ProgramNotificationSubscriberService.GSON.fromJson(str, BasicThrowable.class);
            } catch (JsonSyntaxException e) {
                return null;
            }
        }

        private AppMetadataStore getAppMetadataStore(DatasetContext datasetContext) {
            try {
                AppMetadataStore appMetadataStore = new AppMetadataStore(DatasetsUtil.getOrCreateDataset(datasetContext, ProgramNotificationSubscriberService.this.datasetFramework, ProgramNotificationSubscriberService.APP_META_INSTANCE_ID, Table.class.getName(), DatasetProperties.EMPTY), ProgramNotificationSubscriberService.this.cConf, ProgramNotificationSubscriberService.this.upgradeComplete);
                if (!ProgramNotificationSubscriberService.this.upgradeComplete.get() && appMetadataStore.isUpgradeComplete(ProgramNotificationSubscriberService.APP_VERSION_UPGRADE_KEY)) {
                    ProgramNotificationSubscriberService.this.upgradeComplete.set(true);
                }
                return appMetadataStore;
            } catch (DatasetManagementException | IOException e) {
                throw Throwables.propagate(e);
            }
        }
    }

    @Inject
    ProgramNotificationSubscriberService(MessagingService messagingService, CConfiguration cConfiguration, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, MetricsCollectionService metricsCollectionService) {
        super(messagingService, cConfiguration, datasetFramework, transactionSystemClient, metricsCollectionService);
        this.cConf = cConfiguration;
        this.datasetFramework = datasetFramework;
        this.upgradeComplete = new AtomicBoolean(false);
        this.recordedProgramStatusPublishTopic = cConfiguration.get("program.status.record.event.topic");
    }

    protected void startUp() {
        LOG.info("Starting {}", getClass().getSimpleName());
        this.taskExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("program-status-subscriber-task").build());
        this.taskExecutorService.submit(new ProgramStatusSubscriberRunnable(this.cConf.get("program.status.event.topic")));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService
    public void shutDown() {
        super.shutDown();
        try {
            this.taskExecutorService.shutdownNow();
            this.taskExecutorService.awaitTermination(5L, TimeUnit.SECONDS);
            if (!this.taskExecutorService.isTerminated()) {
                this.taskExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            if (!this.taskExecutorService.isTerminated()) {
                this.taskExecutorService.shutdownNow();
            }
        } catch (Throwable th) {
            if (!this.taskExecutorService.isTerminated()) {
                this.taskExecutorService.shutdownNow();
            }
            throw th;
        }
        LOG.info("Stopped {}", getClass().getSimpleName());
    }
}
