package gobblin.runtime.spec_catalog;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import gobblin.annotation.Alpha;
import gobblin.configuration.State;
import gobblin.instrumented.Instrumented;
import gobblin.metrics.MetricContext;
import gobblin.metrics.Tag;
import gobblin.runtime.api.GobblinInstanceEnvironment;
import gobblin.runtime.api.MutableSpecCatalog;
import gobblin.runtime.api.Spec;
import gobblin.runtime.api.SpecCatalog;
import gobblin.runtime.api.SpecCatalogListener;
import gobblin.runtime.api.SpecNotFoundException;
import gobblin.runtime.api.SpecSerDe;
import gobblin.runtime.api.SpecStore;
import gobblin.runtime.api.TopologySpec;
import gobblin.runtime.spec_store.FSSpecStore;
import gobblin.util.ClassAliasResolver;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:gobblin/runtime/spec_catalog/TopologyCatalog.class */
public class TopologyCatalog extends AbstractIdleService implements SpecCatalog, MutableSpecCatalog, SpecSerDe {
    public static final String DEFAULT_TOPOLOGYSPEC_STORE_CLASS = FSSpecStore.class.getCanonicalName();
    protected final SpecCatalogListenersList listeners;
    protected final Logger log;
    protected final MetricContext metricContext;
    protected final SpecCatalog.StandardMetrics metrics;
    protected final SpecStore specStore;
    private final ClassAliasResolver<SpecStore> aliasResolver;

    public TopologyCatalog(Config config) {
        this(config, (Optional<Logger>) Optional.absent());
    }

    public TopologyCatalog(Config config, Optional<Logger> optional) {
        this(config, optional, Optional.absent(), true);
    }

    public TopologyCatalog(Config config, GobblinInstanceEnvironment gobblinInstanceEnvironment) {
        this(config, Optional.of(gobblinInstanceEnvironment.getLog()), Optional.of(gobblinInstanceEnvironment.getMetricContext()), gobblinInstanceEnvironment.isInstrumentationEnabled());
    }

    public TopologyCatalog(Config config, Optional<Logger> optional, Optional<MetricContext> optional2, boolean z) {
        this.log = optional.isPresent() ? (Logger) optional.get() : LoggerFactory.getLogger(getClass());
        this.listeners = new SpecCatalogListenersList(optional);
        if (z) {
            this.metricContext = ((MetricContext) optional2.or(Instrumented.getMetricContext(new State(), getClass()))).childBuilder(TopologyCatalog.class.getSimpleName()).build();
            this.metrics = new SpecCatalog.StandardMetrics(this);
        } else {
            this.metricContext = null;
            this.metrics = null;
        }
        this.aliasResolver = new ClassAliasResolver<>(SpecStore.class);
        try {
            Config withValue = config.hasPath("topologySpec.store.dir") ? config.withValue("specStore.fs.dir", config.getValue("topologySpec.store.dir")) : config;
            String string = config.hasPath("topologySpec.store.class") ? config.getString("topologySpec.store.class") : DEFAULT_TOPOLOGYSPEC_STORE_CLASS;
            this.log.info("Using SpecStore class name/alias " + string);
            this.specStore = (SpecStore) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(string)), new Object[]{withValue, this});
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    protected void startUp() throws Exception {
        notifyAllListeners();
    }

    protected void shutDown() throws Exception {
        this.listeners.close();
    }

    protected void notifyAllListeners() {
        Iterator<Spec> it = getSpecs().iterator();
        while (it.hasNext()) {
            this.listeners.onAddSpec(it.next());
        }
    }

    @Override // gobblin.runtime.api.SpecCatalogListenersContainer
    public void addListener(SpecCatalogListener specCatalogListener) {
        Preconditions.checkNotNull(specCatalogListener);
        this.listeners.addListener(specCatalogListener);
        if (state() == Service.State.RUNNING) {
            Iterator<Spec> it = getSpecs().iterator();
            while (it.hasNext()) {
                this.listeners.callbackOneListener(new SpecCatalogListener.AddSpecCallback(it.next()), specCatalogListener);
            }
        }
    }

    @Override // gobblin.runtime.api.SpecCatalogListenersContainer
    public void removeListener(SpecCatalogListener specCatalogListener) {
        this.listeners.removeListener(specCatalogListener);
    }

    @Override // gobblin.runtime.api.SpecCatalogListenersContainer
    public void registerWeakSpecCatalogListener(SpecCatalogListener specCatalogListener) {
        this.listeners.registerWeakSpecCatalogListener(specCatalogListener);
    }

    @Nonnull
    public MetricContext getMetricContext() {
        return this.metricContext;
    }

    public boolean isInstrumentationEnabled() {
        return null != this.metricContext;
    }

    public List<Tag<?>> generateTags(State state) {
        return Collections.emptyList();
    }

    public void switchMetricContext(List<Tag<?>> list) {
        throw new UnsupportedOperationException();
    }

    public void switchMetricContext(MetricContext metricContext) {
        throw new UnsupportedOperationException();
    }

    @Override // gobblin.runtime.api.SpecCatalog
    public SpecCatalog.StandardMetrics getMetrics() {
        return this.metrics;
    }

    @Override // gobblin.runtime.api.SpecCatalog
    public Collection<Spec> getSpecs() {
        try {
            return this.specStore.getSpecs();
        } catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
        }
    }

    @Override // gobblin.runtime.api.SpecCatalog
    public Spec getSpec(URI uri) throws SpecNotFoundException {
        try {
            return this.specStore.getSpec(uri);
        } catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e);
        }
    }

    @Override // gobblin.runtime.api.MutableSpecCatalog
    public void put(Spec spec) {
        try {
            Preconditions.checkState(state() == Service.State.RUNNING, String.format("%s is not running.", getClass().getName()));
            Preconditions.checkNotNull(spec);
            this.log.info(String.format("Adding TopologySpec with URI: %s and Config: %s", spec.getUri(), ((TopologySpec) spec).getConfigAsProperties()));
            if (this.specStore.exists(spec.getUri())) {
                this.specStore.updateSpec(spec);
                this.listeners.onUpdateSpec(spec);
            } else {
                this.specStore.addSpec(spec);
                this.listeners.onAddSpec(spec);
            }
        } catch (SpecNotFoundException | IOException e) {
            throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
        }
    }

    @Override // gobblin.runtime.api.MutableSpecCatalog
    public void remove(URI uri) {
        try {
            Preconditions.checkState(state() == Service.State.RUNNING, String.format("%s is not running.", getClass().getName()));
            Preconditions.checkNotNull(uri);
            this.log.info(String.format("Removing TopologySpec with URI: %s", uri));
            Spec spec = this.specStore.getSpec(uri);
            this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion());
            this.specStore.deleteSpec(uri);
        } catch (SpecNotFoundException | IOException e) {
            throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
        }
    }

    @Override // gobblin.runtime.api.SpecSerDe
    public byte[] serialize(Spec spec) {
        return SerializationUtils.serialize(spec);
    }

    @Override // gobblin.runtime.api.SpecSerDe
    public Spec deserialize(byte[] bArr) {
        return (Spec) SerializationUtils.deserialize(bArr);
    }
}
