package org.apache.gobblin.service.monitoring;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.class */
public class SpecStoreChangeMonitor extends HighLevelConsumer {
    private static final Logger log = LoggerFactory.getLogger(SpecStoreChangeMonitor.class);
    public static final String SPEC_STORE_CHANGE_MONITOR_PREFIX = "specStoreChangeMonitor";
    private ContextAwareMeter successfullyAddedSpecs;
    private ContextAwareMeter messageProcessedMeter;
    private ContextAwareMeter failedAddedSpecs;
    private ContextAwareMeter deletedSpecs;
    private ContextAwareMeter unexpectedErrors;
    private ContextAwareGauge produceToConsumeDelayMillis;
    private volatile Long produceToConsumeDelayValue;
    protected CacheLoader<String, String> cacheLoader;
    protected LoadingCache<String, String> specChangesSeenCache;
    protected FlowCatalog flowCatalog;
    protected GobblinServiceJobScheduler scheduler;

    public SpecStoreChangeMonitor(String str, Config config, FlowCatalog flowCatalog, GobblinServiceJobScheduler gobblinServiceJobScheduler, int i) {
        super(str, config.withValue("group.id", ConfigValueFactory.fromAnyRef(SPEC_STORE_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())), i);
        this.produceToConsumeDelayValue = -1L;
        this.cacheLoader = new CacheLoader<String, String>() { // from class: org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor.1
            public String load(String str2) throws Exception {
                return str2;
            }
        };
        this.specChangesSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10L, TimeUnit.MINUTES).build(this.cacheLoader);
        this.flowCatalog = flowCatalog;
        this.scheduler = gobblinServiceJobScheduler;
    }

    protected void assignTopicPartitions() {
    }

    protected void processMessage(DecodeableKafkaRecord decodeableKafkaRecord) {
        this.messageProcessedMeter.mark();
        String str = (String) decodeableKafkaRecord.getKey();
        GenericStoreChangeEvent genericStoreChangeEvent = (GenericStoreChangeEvent) decodeableKafkaRecord.getValue();
        String txId = genericStoreChangeEvent.getTxId();
        Long produceTimestampMillis = genericStoreChangeEvent.getProduceTimestampMillis();
        String name = genericStoreChangeEvent.getOperationType().name();
        this.produceToConsumeDelayValue = calcMillisSince(produceTimestampMillis);
        log.debug("Processing message where specUri is {} tid: {} operation: {} delay: {}", new Object[]{str, txId, name, this.produceToConsumeDelayValue});
        String str2 = txId + str;
        if (ChangeMonitorUtils.shouldProcessMessage(str2, this.specChangesSeenCache, name, produceTimestampMillis.toString())) {
            try {
                URI uri = new URI(str);
                Spec specWrapper = !name.equals("DELETE") ? this.flowCatalog.getSpecWrapper(uri) : null;
                try {
                    if (name.equals("INSERT") || name.equals("UPDATE")) {
                        AddSpecResponse onAddSpec = this.scheduler.onAddSpec(specWrapper);
                        if (onAddSpec == null || !FlowCatalog.isCompileSuccessful((String) onAddSpec.getValue())) {
                            log.warn("Failed to add spec {} to scheduler due to compile error. The flow graph changed recently to invalidate the earlier compilation. Examine changes to locate error. Response is {}", specWrapper, onAddSpec);
                            this.failedAddedSpecs.mark();
                        } else {
                            log.info("Successfully added spec {} to scheduler response {}", specWrapper, StringEscapeUtils.escapeJson(onAddSpec.getValue().toString()));
                            this.successfullyAddedSpecs.mark();
                        }
                    } else if (!name.equals("DELETE")) {
                        log.warn("Received unsupported change type of operation {}. Expected values to be in [INSERT, UPDATE, DELETE, HEARTBEAT]. Look for issue with kafka event consumer or emitter", name);
                        this.unexpectedErrors.mark();
                        return;
                    } else {
                        log.info("Deleting spec {} after receiving spec store change event", uri);
                        this.scheduler.onDeleteSpec(uri, "");
                        this.deletedSpecs.mark();
                    }
                    this.specChangesSeenCache.put(str2, str2);
                } catch (Exception e) {
                    log.warn("Ran into unexpected error processing SpecStore changes. Reexamine scheduler. Error: {}", e);
                    this.unexpectedErrors.mark();
                }
            } catch (URISyntaxException e2) {
                log.warn("Could not create URI object for specUri {} due to error {}", str, e2.getMessage());
                this.unexpectedErrors.mark();
            }
        }
    }

    protected void createMetrics() {
        super.createMetrics();
        this.successfullyAddedSpecs = getMetricContext().contextAwareMeter("GobblinService.specStoreMonitor.successful.added.specs");
        this.failedAddedSpecs = getMetricContext().contextAwareMeter("GobblinService.specStoreMonitor.failed.added.specs");
        this.deletedSpecs = getMetricContext().contextAwareMeter("GobblinService.specStoreMonitor.deleted.specs");
        this.unexpectedErrors = getMetricContext().contextAwareMeter("GobblinService.specStoreMonitor.unexpected.errors");
        this.messageProcessedMeter = getMetricContext().contextAwareMeter("GobblinService.specStoreMonitor.message.processed");
        this.produceToConsumeDelayMillis = getMetricContext().newContextAwareGauge("GobblinService.specstoreMonitor.produce.to.consume.delay", () -> {
            return this.produceToConsumeDelayValue;
        });
        getMetricContext().register(this.produceToConsumeDelayMillis);
    }
}
