package org.apache.gobblin.service.monitoring;

import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Properties;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.util.ConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.class */
public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
    private static final Logger log = LoggerFactory.getLogger(KafkaAvroJobStatusMonitor.class);
    private static final String JOB_STATUS_MONITOR_MESSAGE_PARSE_FAILURES = "jobStatusMonitor.messageParseFailures";
    private final ThreadLocal<SpecificDatumReader<GobblinTrackingEvent>> reader;
    private final ThreadLocal<BinaryDecoder> decoder;
    private final SchemaVersionWriter schemaVersionWriter;
    private Meter messageParseFailures;

    public KafkaAvroJobStatusMonitor(String str, Config config, int i, JobIssueEventHandler jobIssueEventHandler, GaaSObservabilityEventProducer gaaSObservabilityEventProducer) throws IOException, ReflectiveOperationException {
        super(str, config, i, jobIssueEventHandler, gaaSObservabilityEventProducer);
        if (ConfigUtils.getBoolean(config, "metrics.reporting.kafka.avro.use.schema.registry", false)) {
            this.schemaVersionWriter = new SchemaRegistryVersionWriter(new KafkaAvroSchemaRegistryFactory().create(ConfigUtils.configToProperties(config)), str, GobblinTrackingEvent.SCHEMA$);
        } else {
            this.schemaVersionWriter = new FixedSchemaVersionWriter();
        }
        this.decoder = ThreadLocal.withInitial(() -> {
            return DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(new byte[0]), (BinaryDecoder) null);
        });
        this.reader = ThreadLocal.withInitial(() -> {
            return new SpecificDatumReader(GobblinTrackingEvent.SCHEMA$);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor
    public void createMetrics() {
        super.createMetrics();
        this.messageParseFailures = getMetricContext().meter(JOB_STATUS_MONITOR_MESSAGE_PARSE_FAILURES);
    }

    @Override // org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor
    @VisibleForTesting
    public GobblinTrackingEvent deserializeEvent(DecodeableKafkaRecord<byte[], byte[]> decodeableKafkaRecord) {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream((byte[]) decodeableKafkaRecord.getValue());
            this.schemaVersionWriter.advanceInputStreamToRecord(new DataInputStream(byteArrayInputStream));
            return (GobblinTrackingEvent) this.reader.get().read((Object) null, DecoderFactory.get().binaryDecoder(byteArrayInputStream, this.decoder.get()));
        } catch (Exception e) {
            this.messageParseFailures.mark();
            if (this.messageParseFailures.getFiveMinuteRate() < 1.0d) {
                log.warn("Unable to decode input message at kafka offset" + decodeableKafkaRecord.getOffset(), e);
                return null;
            }
            log.warn("Unable to decode input message at kafka offset" + decodeableKafkaRecord.getOffset());
            return null;
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:74:0x0239. Please report as an issue. */
    @Override // org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor
    @VisibleForTesting
    public State parseJobStatus(GobblinTrackingEvent gobblinTrackingEvent) {
        if (!acceptEvent(gobblinTrackingEvent)) {
            return null;
        }
        Properties properties = new Properties();
        properties.putAll(gobblinTrackingEvent.getMetadata());
        String name = gobblinTrackingEvent.getName();
        boolean z = -1;
        switch (name.hashCode()) {
            case -1937199736:
                if (name.equals("WorkUnitsCreationTimer")) {
                    z = true;
                    break;
                }
                break;
            case -1743506017:
                if (name.equals("FlowStartDeadlineExceeded")) {
                    z = 20;
                    break;
                }
                break;
            case -1563125761:
                if (name.equals("FlowCompiled")) {
                    z = false;
                    break;
                }
                break;
            case -1536541828:
                if (name.equals("JobSummaryTimer")) {
                    z = 4;
                    break;
                }
                break;
            case -1408633586:
                if (name.equals("JobCancelTimer")) {
                    z = 17;
                    break;
                }
                break;
            case -955774277:
                if (name.equals("JobPrepareTimer")) {
                    z = 10;
                    break;
                }
                break;
            case -442293446:
                if (name.equals("JobPending")) {
                    z = 6;
                    break;
                }
                break;
            case -413596665:
                if (name.equals("JobPendingResume")) {
                    z = 8;
                    break;
                }
                break;
            case -320510392:
                if (name.equals("FlowRunDeadlineExceeded")) {
                    z = 19;
                    break;
                }
                break;
            case 20979443:
                if (name.equals("jobCompletionPercentage")) {
                    z = 21;
                    break;
                }
                break;
            case 243401164:
                if (name.equals("WorkUnitsPreparationTimer")) {
                    z = 5;
                    break;
                }
                break;
            case 411321489:
                if (name.equals("FlowRunning")) {
                    z = 3;
                    break;
                }
                break;
            case 514175275:
                if (name.equals("JobFailedTimer")) {
                    z = 15;
                    break;
                }
                break;
            case 608625539:
                if (name.equals("JobOrchestrated")) {
                    z = 9;
                    break;
                }
                break;
            case 669838243:
                if (name.equals("FlowCancelled")) {
                    z = 16;
                    break;
                }
                break;
            case 854356386:
                if (name.equals("FlowCompileFailed")) {
                    z = 14;
                    break;
                }
                break;
            case 1138984115:
                if (name.equals("FlowSucceeded")) {
                    z = 11;
                    break;
                }
                break;
            case 1384153697:
                if (name.equals("JobSucceededTimer")) {
                    z = 12;
                    break;
                }
                break;
            case 1557158432:
                if (name.equals("jobSkippedTime")) {
                    z = 18;
                    break;
                }
                break;
            case 1608556416:
                if (name.equals("JobStartTimer")) {
                    z = 2;
                    break;
                }
                break;
            case 1918679542:
                if (name.equals("FlowPendingResume")) {
                    z = 7;
                    break;
                }
                break;
            case 2006401835:
                if (name.equals("FlowFailed")) {
                    z = 13;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                properties.put("eventName", ExecutionStatus.COMPILED.name());
                return new State(properties);
            case true:
                properties.put("workunitPlanStartTime", properties.getProperty("startTime"));
                properties.put("workunitPlanEndTime", properties.getProperty("endTime"));
                return new State(properties);
            case true:
            case true:
            case true:
            case true:
                properties.put("eventName", ExecutionStatus.RUNNING.name());
                return new State(properties);
            case true:
                properties.put("eventName", ExecutionStatus.PENDING.name());
                return new State(properties);
            case true:
            case true:
                properties.put("eventName", ExecutionStatus.PENDING_RESUME.name());
                return new State(properties);
            case true:
                properties.put("eventName", ExecutionStatus.ORCHESTRATED.name());
                properties.put("jobOrchestratedTime", properties.getProperty("endTime"));
                return new State(properties);
            case true:
                properties.put("eventName", ExecutionStatus.RUNNING.name());
                properties.put("jobStartTime", properties.getProperty("startTime"));
                return new State(properties);
            case true:
            case true:
                properties.put("eventName", ExecutionStatus.COMPLETE.name());
                properties.put("jobEndTime", properties.getProperty("endTime"));
                return new State(properties);
            case true:
            case true:
            case true:
                properties.put("eventName", ExecutionStatus.FAILED.name());
                properties.put("jobEndTime", properties.getProperty("endTime"));
                return new State(properties);
            case true:
            case true:
            case true:
                properties.put("eventName", ExecutionStatus.CANCELLED.name());
                properties.put("jobEndTime", properties.getProperty("endTime"));
                return new State(properties);
            case true:
            case true:
                properties.put("doesCancelledFlowMeritRetry", true);
                properties.put("eventName", ExecutionStatus.CANCELLED.name());
                properties.put("jobEndTime", properties.getProperty("endTime"));
                return new State(properties);
            case true:
                properties.put("jobLastProgressEventTime", properties.getProperty("endTime"));
                return new State(properties);
            default:
                return null;
        }
    }

    private boolean acceptEvent(GobblinTrackingEvent gobblinTrackingEvent) {
        return gobblinTrackingEvent.getMetadata().containsKey("flowGroup") && gobblinTrackingEvent.getMetadata().containsKey("flowName") && gobblinTrackingEvent.getMetadata().containsKey("flowExecutionId");
    }

    public Meter getMessageParseFailures() {
        return this.messageParseFailures;
    }
}
