package org.apache.gobblin.service.monitoring;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.sql.SQLException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.class */
public class DagActionStoreChangeMonitor extends HighLevelConsumer {
    private static final Logger log = LoggerFactory.getLogger(DagActionStoreChangeMonitor.class);
    public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = "dagActionChangeStore";
    private ContextAwareMeter killsInvoked;
    private ContextAwareMeter resumesInvoked;
    private ContextAwareMeter unexpectedErrors;
    private ContextAwareMeter messageProcessedMeter;
    private ContextAwareGauge produceToConsumeDelayMillis;
    private volatile Long produceToConsumeDelayValue;
    protected CacheLoader<String, String> cacheLoader;
    protected LoadingCache<String, String> dagActionsSeenCache;
    protected DagActionStore dagActionStore;
    protected DagManager dagManager;

    public DagActionStoreChangeMonitor(String str, Config config, DagActionStore dagActionStore, DagManager dagManager, int i) {
        super(str, config.withValue("group.id", ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())), i);
        this.produceToConsumeDelayValue = -1L;
        this.cacheLoader = new CacheLoader<String, String>() { // from class: org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor.1
            public String load(String str2) throws Exception {
                return str2;
            }
        };
        this.dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10L, TimeUnit.MINUTES).build(this.cacheLoader);
        this.dagActionStore = dagActionStore;
        this.dagManager = dagManager;
    }

    protected void assignTopicPartitions() {
    }

    protected void processMessage(DecodeableKafkaRecord decodeableKafkaRecord) {
        this.messageProcessedMeter.mark();
        String str = (String) decodeableKafkaRecord.getKey();
        DagActionStoreChangeEvent dagActionStoreChangeEvent = (DagActionStoreChangeEvent) decodeableKafkaRecord.getValue();
        String txId = dagActionStoreChangeEvent.getChangeEventIdentifier().getTxId();
        Long produceTimestampMillis = dagActionStoreChangeEvent.getChangeEventIdentifier().getProduceTimestampMillis();
        String name = dagActionStoreChangeEvent.getChangeEventIdentifier().getOperationType().name();
        String flowGroup = dagActionStoreChangeEvent.getFlowGroup();
        String flowName = dagActionStoreChangeEvent.getFlowName();
        String flowExecutionId = dagActionStoreChangeEvent.getFlowExecutionId();
        this.produceToConsumeDelayValue = calcMillisSince(produceTimestampMillis);
        log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} tid: {} operation: {} lag: {}", new Object[]{flowGroup, flowName, flowExecutionId, txId, name, this.produceToConsumeDelayValue});
        String str2 = txId + str;
        if (ChangeMonitorUtils.shouldProcessMessage(str2, this.dagActionsSeenCache, name, produceTimestampMillis.toString())) {
            DagActionStore.DagActionValue dagActionValue = null;
            if (!name.equals("DELETE")) {
                try {
                    dagActionValue = this.dagActionStore.getDagAction(flowGroup, flowName, flowExecutionId).getDagActionValue();
                } catch (IOException e) {
                    log.error("Encountered IOException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. Exception: {}", new Object[]{flowGroup, flowName, flowExecutionId, e});
                    this.unexpectedErrors.mark();
                    return;
                } catch (SQLException e2) {
                    log.error("Encountered SQLException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. Exception: {}", new Object[]{flowGroup, flowName, flowExecutionId, e2});
                    return;
                } catch (SpecNotFoundException e3) {
                    log.error("DagAction not found for flow group: {} name: {} executionId: {} Exception: {}", new Object[]{flowGroup, flowName, flowExecutionId, e3});
                    this.unexpectedErrors.mark();
                    return;
                }
            }
            try {
                if (name.equals("INSERT")) {
                    if (dagActionValue.equals(DagActionStore.DagActionValue.RESUME)) {
                        log.info("Received insert dag action and about to send resume flow request");
                        this.dagManager.handleResumeFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId));
                        this.resumesInvoked.mark();
                    } else if (!dagActionValue.equals(DagActionStore.DagActionValue.KILL)) {
                        log.warn("Received unsupported dagAction {}. Expected to be a KILL or RESUME", dagActionValue);
                        this.unexpectedErrors.mark();
                        return;
                    } else {
                        log.info("Received insert dag action and about to send kill flow request");
                        this.dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId));
                        this.killsInvoked.mark();
                    }
                } else if (name.equals("UPDATE")) {
                    log.warn("Received an UPDATE action to the DagActionStore when values in this store are never supposed to be updated. Flow group: {} name {} executionId {} were updated to action {}", new Object[]{flowGroup, flowName, flowExecutionId, dagActionValue});
                    this.unexpectedErrors.mark();
                } else {
                    if (!name.equals("DELETE")) {
                        log.warn("Received unsupported change type of operation {}. Expected values to be in [INSERT, UPDATE, DELETE]", name);
                        this.unexpectedErrors.mark();
                        return;
                    }
                    log.debug("Deleted flow group: {} name: {} executionId {} from DagActionStore", new Object[]{flowGroup, flowName, flowExecutionId});
                }
            } catch (Exception e4) {
                log.warn("Ran into unexpected error processing DagActionStore changes: {}", e4);
                this.unexpectedErrors.mark();
            }
            this.dagActionsSeenCache.put(str2, str2);
        }
    }

    protected void createMetrics() {
        super.createMetrics();
        this.killsInvoked = getMetricContext().contextAwareMeter("GobblinService.dagActionStoreMonitor.kills.invoked");
        this.resumesInvoked = getMetricContext().contextAwareMeter("GobblinService.dagActionStoreMonitor.resumes.invoked");
        this.unexpectedErrors = getMetricContext().contextAwareMeter("GobblinService.dagActionStoreMonitor.unexpected.errors");
        this.messageProcessedMeter = getMetricContext().contextAwareMeter("GobblinService.dagActionStoreMonitor.message.processed");
        this.produceToConsumeDelayMillis = getMetricContext().newContextAwareGauge("GobblinService.dagActionStoreMonitor.produce.to.consume.delay", () -> {
            return this.produceToConsumeDelayValue;
        });
        getMetricContext().register(this.produceToConsumeDelayMillis);
    }
}
