package org.apache.gobblin.service.monitoring;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
import org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.retry.RetryerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.class */
public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], byte[]> {
    static final String JOB_STATUS_MONITOR_TOPIC_KEY = "topic";
    static final String JOB_STATUS_MONITOR_NUM_THREADS_KEY = "numThreads";
    static final String JOB_STATUS_MONITOR_CLASS_KEY = "class";
    private final StateStore<State> stateStore;
    private final ScheduledExecutorService scheduledExecutorService;
    private final JobIssueEventHandler jobIssueEventHandler;
    private final Retryer<Void> persistJobStatusRetryer;
    private final GaaSObservabilityEventProducer eventProducer;
    private static final Logger log = LoggerFactory.getLogger(KafkaJobStatusMonitor.class);
    public static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
    public static final String GET_AND_SET_JOB_STATUS = MetricRegistry.name("GobblinService", new String[]{JOB_STATUS_MONITOR_PREFIX, "getAndSetJobStatus"});
    private static final String PROCESS_JOB_ISSUE = MetricRegistry.name("GobblinService", new String[]{JOB_STATUS_MONITOR_PREFIX, "jobIssueProcessingTime"});
    static final String DEFAULT_JOB_STATUS_MONITOR_CLASS = KafkaAvroJobStatusMonitor.class.getName();
    private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of("time_out_ms", Long.valueOf(TimeUnit.HOURS.toMillis(24)), "interval_ms", Long.valueOf(TimeUnit.MINUTES.toMillis(1)), "retry_type", RetryerFactory.RetryType.EXPONENTIAL.name()));
    private static final String KAFKA_AUTO_OFFSET_RESET_KEY = "auto.offset.reset";
    private static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
    private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
    private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY, ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);

    public KafkaJobStatusMonitor(String str, Config config, int i, JobIssueEventHandler jobIssueEventHandler, GaaSObservabilityEventProducer gaaSObservabilityEventProducer) throws ReflectiveOperationException {
        super(str, config.withFallback(DEFAULTS), i);
        this.stateStore = ((StateStore.Factory) Class.forName(ConfigUtils.getString(config, "state.store.factory.class", FileContextBasedFsStateStoreFactory.class.getName())).newInstance()).createStateStore(config, State.class);
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        this.jobIssueEventHandler = jobIssueEventHandler;
        this.persistJobStatusRetryer = RetryerFactory.newInstance((config.hasPath(JOB_STATUS_MONITOR_PREFIX) ? config.getConfig(JOB_STATUS_MONITOR_PREFIX) : ConfigFactory.empty()).withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() { // from class: org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor.1
            public <V> void onRetry(Attempt<V> attempt) {
                if (attempt.hasException()) {
                    KafkaJobStatusMonitor.log.warn(String.format("(Likely retryable) failure adding job status to state store [attempt: %d; %s after start]", Long.valueOf(attempt.getAttemptNumber()), Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString()), attempt.getExceptionCause());
                }
            }
        }));
        this.eventProducer = gaaSObservabilityEventProducer;
    }

    protected void startUp() {
        super.startUp();
        log.info("Scheduling state store cleaner..");
        State state = new State(ConfigUtils.configToProperties(this.config));
        state.setProp("job.id", "GobblinServiceJobStatusCleanerJob");
        state.setProp("task.id", "GobblinServiceJobStatusCleanerTask");
        this.scheduledExecutorService.scheduleAtFixedRate(new DatasetCleanerTask(new TaskContext(new WorkUnitState(WorkUnit.createEmpty(), state))), 300L, 86400L, TimeUnit.SECONDS);
    }

    public void shutDown() {
        super.shutDown();
        this.scheduledExecutorService.shutdown();
        try {
            this.scheduledExecutorService.awaitTermination(30L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("Exception encountered when shutting down state store cleaner", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createMetrics() {
        super.createMetrics();
    }

    protected void processMessage(DecodeableKafkaRecord<byte[], byte[]> decodeableKafkaRecord) {
        Throwable th;
        GobblinTrackingEvent deserializeEvent = deserializeEvent(decodeableKafkaRecord);
        if (deserializeEvent == null) {
            return;
        }
        if (IssueEventBuilder.isIssueEvent(deserializeEvent)) {
            Timer.Context time = getMetricContext().timer(PROCESS_JOB_ISSUE).time();
            Throwable th2 = null;
            try {
                try {
                    this.jobIssueEventHandler.processEvent(deserializeEvent);
                    if (time != null) {
                        if (0 != 0) {
                            try {
                                time.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            time.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th4) {
                if (time != null) {
                    if (th2 != null) {
                        try {
                            time.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        time.close();
                    }
                }
                throw th4;
            }
        }
        try {
            this.persistJobStatusRetryer.call(() -> {
                State parseJobStatus = parseJobStatus(deserializeEvent);
                if (parseJobStatus == null) {
                    return null;
                }
                Timer.Context time2 = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time();
                Throwable th6 = null;
                try {
                    try {
                        addJobStatusToStateStore(parseJobStatus, this.stateStore, this.eventProducer);
                        if (time2 == null) {
                            return null;
                        }
                        if (0 == 0) {
                            time2.close();
                            return null;
                        }
                        try {
                            time2.close();
                            return null;
                        } catch (Throwable th7) {
                            th6.addSuppressed(th7);
                            return null;
                        }
                    } catch (Throwable th8) {
                        th6 = th8;
                        throw th8;
                    }
                } catch (Throwable th9) {
                    if (time2 != null) {
                        if (th6 != null) {
                            try {
                                time2.close();
                            } catch (Throwable th10) {
                                th6.addSuppressed(th10);
                            }
                        } else {
                            time2.close();
                        }
                    }
                    throw th9;
                }
            });
        } catch (ExecutionException e) {
            String format = String.format("Failed to add job status to state store for kafka offset %d", Long.valueOf(decodeableKafkaRecord.getOffset()));
            log.warn(format, e);
            throw new RuntimeException(format, e.getCause());
        } catch (RetryException th6) {
            String format2 = String.format("Failed to add job status to state store for kafka offset %d (retried %d times%s)", Long.valueOf(decodeableKafkaRecord.getOffset()), Integer.valueOf(th6.getNumberOfFailedAttempts()), Thread.currentThread().isInterrupted() ? "... then interrupted" : "");
            if (th6.getLastFailedAttempt().hasException()) {
                th = th6.getLastFailedAttempt().getExceptionCause();
            }
            Throwable th7 = th;
            log.warn(format2, th7);
            throw new RuntimeException(format2, th7);
        }
    }

    @VisibleForTesting
    static void addJobStatusToStateStore(State state, StateStore stateStore, GaaSObservabilityEventProducer gaaSObservabilityEventProducer) throws IOException {
        try {
            if (!state.contains("jobName")) {
                state.setProp("jobName", "NA");
            }
            if (!state.contains("jobGroup")) {
                state.setProp("jobGroup", "NA");
            }
            String prop = state.getProp("flowName");
            String prop2 = state.getProp("flowGroup");
            String prop3 = state.getProp("flowExecutionId");
            String prop4 = state.getProp("jobName");
            String prop5 = state.getProp("jobGroup");
            String jobStatusStoreName = jobStatusStoreName(prop2, prop);
            String jobStatusTableName = jobStatusTableName(prop3, prop5, prop4);
            List all = stateStore.getAll(jobStatusStoreName, jobStatusTableName);
            if (all.size() > 0) {
                State state2 = (State) all.get(all.size() - 1);
                String prop6 = state2.getProp("eventName");
                String prop7 = state.getProp("eventName");
                int propAsInt = state2.getPropAsInt("currentGeneration", 1);
                int propAsInt2 = state.getPropAsInt("currentGeneration", propAsInt);
                int propAsInt3 = state2.getPropAsInt("currentAttempts", 1);
                int propAsInt4 = state.getPropAsInt("currentAttempts", propAsInt3);
                if (isFlowStatusAndPendingResume(prop4, prop5, prop7) || prop6 == null || prop7 == null || (propAsInt <= propAsInt2 && ((propAsInt != propAsInt2 || propAsInt3 <= propAsInt4) && !(propAsInt == propAsInt2 && propAsInt3 == propAsInt4 && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(prop7)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(prop6)))))) {
                    state = mergeState(state, (State) all.get(all.size() - 1));
                } else {
                    log.warn(String.format("Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)", prop7, Integer.valueOf(propAsInt2), Integer.valueOf(propAsInt4), prop6, Integer.valueOf(propAsInt), Integer.valueOf(propAsInt3), prop2, prop, prop3, prop5, prop4));
                    state = mergeState((State) all.get(all.size() - 1), state);
                }
            }
            modifyStateIfRetryRequired(state);
            stateStore.put(jobStatusStoreName, jobStatusTableName, state);
            if (isNewStateTransitionToFinal(state, all)) {
                gaaSObservabilityEventProducer.emitObservabilityEvent(state);
            }
        } catch (Exception e) {
            log.warn("Meet exception when adding jobStatus to state store at " + e.getStackTrace()[0].getClassName() + "line number: " + e.getStackTrace()[0].getLineNumber(), e);
            throw new IOException(e);
        }
    }

    private static boolean isFlowStatusAndPendingResume(String str, String str2, String str3) {
        return str != null && str2 != null && str.equals("NA") && str2.equals("NA") && str3.equals(ExecutionStatus.PENDING_RESUME.name());
    }

    private static void modifyStateIfRetryRequired(State state) {
        int propAsInt = state.getPropAsInt("maxAttempts", 1);
        int propAsInt2 = state.getPropAsInt("currentAttempts", 1);
        if (state.contains("eventName") && ((state.getProp("eventName").equals(ExecutionStatus.FAILED.name()) || state.getProp("eventName").equals(ExecutionStatus.PENDING_RETRY.name()) || (state.getProp("eventName").equals(ExecutionStatus.CANCELLED.name()) && state.contains("doesCancelledFlowMeritRetry"))) && propAsInt2 < propAsInt)) {
            state.setProp("shouldRetry", true);
            state.setProp("eventName", ExecutionStatus.PENDING_RETRY.name());
            state.removeProp("jobEndTime");
        }
        state.removeProp("doesCancelledFlowMeritRetry");
    }

    static boolean isNewStateTransitionToFinal(State state, List<State> list) {
        return list.size() == 0 ? FlowStatusGenerator.FINISHED_STATUSES.contains(state.getProp("eventName")) : state.contains("eventName") && FlowStatusGenerator.FINISHED_STATUSES.contains(state.getProp("eventName")) && !FlowStatusGenerator.FINISHED_STATUSES.contains(list.get(list.size() - 1).getProp("eventName"));
    }

    private static State mergeState(State state, State state2) {
        Properties properties = new Properties();
        properties.putAll(state2.getProperties());
        properties.putAll(state.getProperties());
        return new State(properties);
    }

    public static String jobStatusTableName(String str, String str2, String str3) {
        return Joiner.on(".").join(str, str2, new Object[]{str3, "gst"});
    }

    public static String jobStatusTableName(long j, String str, String str2) {
        return jobStatusTableName(String.valueOf(j), str, str2);
    }

    public static String jobStatusStoreName(String str, String str2) {
        return Joiner.on(".").join(str, str2, new Object[0]);
    }

    public static long getExecutionIdFromTableName(String str) {
        return Long.parseLong((String) Splitter.on(".").splitToList(str).get(0));
    }

    protected abstract GobblinTrackingEvent deserializeEvent(DecodeableKafkaRecord<byte[], byte[]> decodeableKafkaRecord);

    protected abstract State parseJobStatus(GobblinTrackingEvent gobblinTrackingEvent);

    public StateStore<State> getStateStore() {
        return this.stateStore;
    }
}
