package org.apache.gobblin.runtime.spec_catalog;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.MutableSpecCatalog;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalog;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.util.ClassAliasResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/runtime/spec_catalog/FlowCatalog.class */
public class FlowCatalog extends AbstractIdleService implements SpecCatalog, MutableSpecCatalog, SpecSerDe {
    public static final String DEFAULT_FLOWSPEC_STORE_CLASS = FSSpecStore.class.getCanonicalName();
    protected final SpecCatalogListenersList listeners;
    protected final Logger log;
    protected final MetricContext metricContext;
    protected final MutableSpecCatalog.MutableStandardMetrics metrics;
    protected final SpecStore specStore;
    private final ClassAliasResolver<SpecStore> aliasResolver;

    public FlowCatalog(Config config) {
        this(config, (Optional<Logger>) Optional.absent());
    }

    public FlowCatalog(Config config, Optional<Logger> optional) {
        this(config, optional, Optional.absent(), true);
    }

    public FlowCatalog(Config config, GobblinInstanceEnvironment gobblinInstanceEnvironment) {
        this(config, Optional.of(gobblinInstanceEnvironment.getLog()), Optional.of(gobblinInstanceEnvironment.getMetricContext()), gobblinInstanceEnvironment.isInstrumentationEnabled());
    }

    public FlowCatalog(Config config, Optional<Logger> optional, Optional<MetricContext> optional2, boolean z) {
        this.log = optional.isPresent() ? (Logger) optional.get() : LoggerFactory.getLogger(getClass());
        this.listeners = new SpecCatalogListenersList(optional);
        if (z) {
            this.metricContext = ((MetricContext) optional2.or(Instrumented.getMetricContext(new State(), getClass()))).childBuilder(FlowCatalog.class.getSimpleName()).build();
            this.metrics = new MutableSpecCatalog.MutableStandardMetrics(this, Optional.of(config));
            addListener(this.metrics);
        } else {
            this.metricContext = null;
            this.metrics = null;
        }
        this.aliasResolver = new ClassAliasResolver<>(SpecStore.class);
        try {
            Config withValue = config.hasPath("flowSpec.store.dir") ? config.withValue("specStore.fs.dir", config.getValue("flowSpec.store.dir")) : config;
            String string = config.hasPath("flowSpec.store.class") ? config.getString("flowSpec.store.class") : DEFAULT_FLOWSPEC_STORE_CLASS;
            this.log.info("Using audit sink class name/alias " + string);
            this.specStore = (SpecStore) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(string)), new Object[]{withValue, this});
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    protected void startUp() throws Exception {
    }

    protected void shutDown() throws Exception {
        this.listeners.close();
    }

    protected void notifyAllListeners() {
        Iterator<Spec> it = getSpecsWithTimeUpdate().iterator();
        while (it.hasNext()) {
            this.listeners.onAddSpec(it.next());
        }
    }

    @Override // org.apache.gobblin.runtime.api.SpecCatalogListenersContainer
    public void addListener(SpecCatalogListener specCatalogListener) {
        Preconditions.checkNotNull(specCatalogListener);
        this.listeners.addListener(specCatalogListener);
        if (state() == Service.State.RUNNING) {
            Iterator<Spec> it = getSpecsWithTimeUpdate().iterator();
            while (it.hasNext()) {
                this.listeners.callbackOneListener(new SpecCatalogListener.AddSpecCallback(it.next()), specCatalogListener);
            }
        }
    }

    @Override // org.apache.gobblin.runtime.api.SpecCatalogListenersContainer
    public void removeListener(SpecCatalogListener specCatalogListener) {
        this.listeners.removeListener(specCatalogListener);
    }

    @Override // org.apache.gobblin.runtime.api.SpecCatalogListenersContainer
    public void registerWeakSpecCatalogListener(SpecCatalogListener specCatalogListener) {
        this.listeners.registerWeakSpecCatalogListener(specCatalogListener);
    }

    @Nonnull
    public MetricContext getMetricContext() {
        return this.metricContext;
    }

    public boolean isInstrumentationEnabled() {
        return null != this.metricContext;
    }

    public List<Tag<?>> generateTags(State state) {
        return Collections.emptyList();
    }

    public void switchMetricContext(List<Tag<?>> list) {
        throw new UnsupportedOperationException();
    }

    public void switchMetricContext(MetricContext metricContext) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.gobblin.runtime.api.SpecCatalog
    public SpecCatalog.StandardMetrics getMetrics() {
        return this.metrics;
    }

    @Override // org.apache.gobblin.runtime.api.SpecCatalog
    public Collection<Spec> getSpecs() {
        try {
            return this.specStore.getSpecs();
        } catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
        }
    }

    public Collection<Spec> getSpecsWithTimeUpdate() {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Collection<Spec> specs = this.specStore.getSpecs();
            this.metrics.updateGetSpecTime(currentTimeMillis);
            return specs;
        } catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
        }
    }

    public boolean exists(URI uri) {
        try {
            return this.specStore.exists(uri);
        } catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e);
        }
    }

    @Override // org.apache.gobblin.runtime.api.SpecCatalog
    public Spec getSpec(URI uri) throws SpecNotFoundException {
        try {
            return this.specStore.getSpec(uri);
        } catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e);
        }
    }

    public void put(Spec spec, boolean z) {
        try {
            Preconditions.checkState(state() == Service.State.RUNNING, String.format("%s is not running.", getClass().getName()));
            Preconditions.checkNotNull(spec);
            long currentTimeMillis = System.currentTimeMillis();
            this.log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(), ((FlowSpec) spec).getConfigAsProperties()));
            this.specStore.addSpec(spec);
            this.metrics.updatePutSpecTime(currentTimeMillis);
            if (z) {
                this.listeners.onAddSpec(spec);
            }
        } catch (IOException e) {
            throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
        }
    }

    @Override // org.apache.gobblin.runtime.api.MutableSpecCatalog
    public void put(Spec spec) {
        put(spec, true);
    }

    public void remove(URI uri) {
        remove(uri, new Properties());
    }

    @Override // org.apache.gobblin.runtime.api.MutableSpecCatalog
    public void remove(URI uri, Properties properties) {
        remove(uri, properties, true);
    }

    public void remove(URI uri, Properties properties, boolean z) {
        try {
            Preconditions.checkState(state() == Service.State.RUNNING, String.format("%s is not running.", getClass().getName()));
            Preconditions.checkNotNull(uri);
            long currentTimeMillis = System.currentTimeMillis();
            this.log.info(String.format("Removing FlowSpec with URI: %s", uri));
            this.specStore.deleteSpec(uri);
            this.metrics.updateRemoveSpecTime(currentTimeMillis);
            if (z) {
                this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, properties);
            }
        } catch (IOException e) {
            throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
        }
    }

    @Override // org.apache.gobblin.runtime.api.SpecSerDe
    public byte[] serialize(Spec spec) {
        return SerializationUtils.serialize(spec);
    }

    @Override // org.apache.gobblin.runtime.api.SpecSerDe
    public Spec deserialize(byte[] bArr) {
        return (Spec) SerializationUtils.deserialize(bArr);
    }
}
