package gobblin.runtime.job_monitor;

import com.codahale.metrics.Counter;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import gobblin.metrics.GobblinTrackingEvent;
import gobblin.metrics.reporter.util.NoopSchemaVersionWriter;
import gobblin.metrics.reporter.util.SchemaVersionWriter;
import gobblin.runtime.api.GobblinInstanceDriver;
import gobblin.runtime.api.JobSpec;
import gobblin.runtime.api.JobSpecMonitor;
import gobblin.runtime.api.JobSpecMonitorFactory;
import gobblin.runtime.api.MutableJobCatalog;
import gobblin.runtime.metrics.RuntimeMetrics;
import gobblin.util.Either;
import gobblin.util.PathUtils;
import gobblin.util.reflection.GobblinConstructorUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor.class */
public class SLAEventKafkaJobMonitor extends KafkaAvroJobMonitor<GobblinTrackingEvent> {
    public static final String CONFIG_PREFIX = "gobblin.jobMonitor.slaEvent";
    public static final String DATASET_URN_FILTER_KEY = "filter.urn";
    public static final String EVENT_NAME_FILTER_KEY = "filter.name";
    public static final String TEMPLATE_KEY = "job_template";
    public static final String EXTRACT_KEYS = "extract_keys";
    public static final String TOPIC_KEY = "topic";
    private final Optional<Pattern> urnFilter;
    private final Optional<Pattern> nameFilter;
    private final URI baseURI;
    private final URI template;
    private final Map<String, String> extractKeys;
    private Counter rejectedEvents;
    public static final String BASE_URI_KEY = "baseUri";
    public static final String SCHEMA_VERSION_READER_CLASS = "versionReaderClass";
    private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(BASE_URI_KEY, SLAEventKafkaJobMonitor.class.getSimpleName(), SCHEMA_VERSION_READER_CLASS, NoopSchemaVersionWriter.class.getName()));

    /* loaded from: input_file:gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor$Factory.class */
    public static class Factory implements JobSpecMonitorFactory {
        @Override // gobblin.runtime.api.JobSpecMonitorFactory
        public JobSpecMonitor forJobCatalog(GobblinInstanceDriver gobblinInstanceDriver, MutableJobCatalog mutableJobCatalog) throws IOException {
            return forConfig(gobblinInstanceDriver.getSysConfig().getConfig().getConfig(SLAEventKafkaJobMonitor.CONFIG_PREFIX).withFallback(SLAEventKafkaJobMonitor.DEFAULTS), mutableJobCatalog);
        }

        public JobSpecMonitor forConfig(Config config, MutableJobCatalog mutableJobCatalog) throws IOException {
            Preconditions.checkArgument(config.hasPath(SLAEventKafkaJobMonitor.TEMPLATE_KEY));
            Preconditions.checkArgument(config.hasPath("topic"));
            String string = config.getString("topic");
            try {
                URI uri = new URI(config.getString(SLAEventKafkaJobMonitor.BASE_URI_KEY));
                String string2 = config.getString(SLAEventKafkaJobMonitor.TEMPLATE_KEY);
                try {
                    URI uri2 = new URI(string2);
                    ImmutableMap.Builder builder = ImmutableMap.builder();
                    if (config.hasPath(SLAEventKafkaJobMonitor.EXTRACT_KEYS)) {
                        for (Map.Entry entry : config.getConfig(SLAEventKafkaJobMonitor.EXTRACT_KEYS).entrySet()) {
                            Object unwrapped = ((ConfigValue) entry.getValue()).unwrapped();
                            if (unwrapped instanceof String) {
                                builder.put(entry.getKey(), (String) unwrapped);
                            }
                        }
                    }
                    try {
                        return new SLAEventKafkaJobMonitor(string, mutableJobCatalog, uri, config, (SchemaVersionWriter) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(config.getString(SLAEventKafkaJobMonitor.SCHEMA_VERSION_READER_CLASS)), new Object[]{config}), config.hasPath(SLAEventKafkaJobMonitor.DATASET_URN_FILTER_KEY) ? Optional.of(Pattern.compile(config.getString(SLAEventKafkaJobMonitor.DATASET_URN_FILTER_KEY))) : Optional.absent(), config.hasPath(SLAEventKafkaJobMonitor.EVENT_NAME_FILTER_KEY) ? Optional.of(Pattern.compile(config.getString(SLAEventKafkaJobMonitor.EVENT_NAME_FILTER_KEY))) : Optional.absent(), uri2, builder.build());
                    } catch (ReflectiveOperationException e) {
                        throw new IllegalArgumentException(e);
                    }
                } catch (URISyntaxException e2) {
                    throw new IOException("Invalid template URI " + string2);
                }
            } catch (URISyntaxException e3) {
                throw new IOException("Invalid base URI " + config.getString(SLAEventKafkaJobMonitor.BASE_URI_KEY), e3);
            }
        }
    }

    protected SLAEventKafkaJobMonitor(String str, MutableJobCatalog mutableJobCatalog, URI uri, Config config, SchemaVersionWriter<?> schemaVersionWriter, Optional<Pattern> optional, Optional<Pattern> optional2, URI uri2, Map<String, String> map) throws IOException {
        super(str, mutableJobCatalog, config, GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
        this.baseURI = uri;
        this.urnFilter = optional;
        this.nameFilter = optional2;
        this.template = uri2;
        this.extractKeys = map;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // gobblin.runtime.job_monitor.KafkaAvroJobMonitor, gobblin.runtime.job_monitor.KafkaJobMonitor, gobblin.runtime.kafka.HighLevelConsumer
    public void createMetrics() {
        super.createMetrics();
        this.rejectedEvents = getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS);
    }

    @Override // gobblin.runtime.job_monitor.KafkaAvroJobMonitor
    public Collection<Either<JobSpec, URI>> parseJobSpec(GobblinTrackingEvent gobblinTrackingEvent) {
        if (!acceptEvent(gobblinTrackingEvent)) {
            this.rejectedEvents.inc();
            return Lists.newArrayList();
        }
        URI uri = PathUtils.mergePaths(new Path(this.baseURI), new Path((String) gobblinTrackingEvent.getMetadata().get("event.sla.datasetUrn"))).toUri();
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, String> entry : this.extractKeys.entrySet()) {
            if (gobblinTrackingEvent.getMetadata().containsKey(entry.getKey())) {
                newHashMap.put(entry.getValue(), gobblinTrackingEvent.getMetadata().get(entry.getKey()));
            }
        }
        return Lists.newArrayList(new Either[]{Either.left(JobSpec.builder(uri).withTemplate(this.template).withConfig(ConfigFactory.parseMap(newHashMap)).build())});
    }

    protected boolean acceptEvent(GobblinTrackingEvent gobblinTrackingEvent) {
        if (!gobblinTrackingEvent.getMetadata().containsKey("event.sla.datasetUrn")) {
            return false;
        }
        String str = (String) gobblinTrackingEvent.getMetadata().get("event.sla.datasetUrn");
        if (!this.urnFilter.isPresent() || ((Pattern) this.urnFilter.get()).matcher(str).find()) {
            return !this.nameFilter.isPresent() || ((Pattern) this.nameFilter.get()).matcher(gobblinTrackingEvent.getName()).find();
        }
        return false;
    }

    public Optional<Pattern> getUrnFilter() {
        return this.urnFilter;
    }

    public Optional<Pattern> getNameFilter() {
        return this.nameFilter;
    }

    public URI getBaseURI() {
        return this.baseURI;
    }

    public URI getTemplate() {
        return this.template;
    }

    public Map<String, String> getExtractKeys() {
        return this.extractKeys;
    }

    public Counter getRejectedEvents() {
        return this.rejectedEvents;
    }
}
