package co.cask.cdap.internal.app.runtime.adapter;

import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.schedule.SchedulableProgramType;
import co.cask.cdap.api.schedule.Schedule;
import co.cask.cdap.api.schedule.ScheduleSpecification;
import co.cask.cdap.api.workflow.ScheduleProgramInfo;
import co.cask.cdap.api.workflow.WorkflowSpecification;
import co.cask.cdap.app.ApplicationSpecification;
import co.cask.cdap.app.deploy.ManagerFactory;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.app.store.StoreFactory;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.config.PreferencesStore;
import co.cask.cdap.data.Namespace;
import co.cask.cdap.data2.datafabric.DefaultDatasetNamespace;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DatasetManagementException;
import co.cask.cdap.data2.dataset2.NamespacedDatasetFramework;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.internal.app.deploy.ProgramTerminator;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationDeployScope;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import co.cask.cdap.internal.app.deploy.pipeline.DeploymentInfo;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.schedule.Scheduler;
import co.cask.cdap.internal.app.runtime.schedule.Schedules;
import co.cask.cdap.proto.AdapterSpecification;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.Sink;
import co.cask.cdap.proto.Source;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.gson.Gson;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.jar.Attributes;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/adapter/AdapterService.class */
public class AdapterService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(AdapterService.class);
    private static final Gson GSON = new Gson();
    private static final Type STRING_STRING_MAP_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.internal.app.runtime.adapter.AdapterService.1
    }.getType();
    private static final String DATASET_CLASS = "dataset.class";
    private final LocationFactory locationFactory;
    private final ManagerFactory<DeploymentInfo, ApplicationWithPrograms> managerFactory;
    private final CConfiguration configuration;
    private final DatasetFramework datasetFramework;
    private final StreamAdmin streamAdmin;
    private final Scheduler scheduler;
    private final Store store;
    private final PreferencesStore preferencesStore;
    private Map<String, AdapterTypeInfo> adapterTypeInfos = Maps.newHashMap();
    private final String archiveDir;

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/adapter/AdapterService$AdapterManifestAttributes.class */
    private static class AdapterManifestAttributes {
        private static final String ADAPTER_TYPE = "CDAP-Adapter-Type";
        private static final String ADAPTER_PROPERTIES = "CDAP-Adapter-Properties";
        private static final String ADAPTER_PROGRAM_TYPE = "CDAP-Adapter-Program-Type";
        private static final String SOURCE_TYPE = "CDAP-Source-Type";
        private static final String SINK_TYPE = "CDAP-Sink-Type";
        private static final String SOURCE_PROPERTIES = "CDAP-Source-Properties";
        private static final String SINK_PROPERTIES = "CDAP-Sink-Properties";

        private AdapterManifestAttributes() {
        }
    }

    @Inject
    public AdapterService(CConfiguration cConfiguration, DatasetFramework datasetFramework, Scheduler scheduler, StreamAdmin streamAdmin, StoreFactory storeFactory, LocationFactory locationFactory, ManagerFactory<DeploymentInfo, ApplicationWithPrograms> managerFactory, PreferencesStore preferencesStore) {
        this.configuration = cConfiguration;
        this.datasetFramework = new NamespacedDatasetFramework(datasetFramework, new DefaultDatasetNamespace(cConfiguration, Namespace.USER));
        this.scheduler = scheduler;
        this.streamAdmin = streamAdmin;
        this.store = storeFactory.create();
        this.locationFactory = locationFactory;
        this.managerFactory = managerFactory;
        this.archiveDir = cConfiguration.get("app.output.dir") + "/archive";
        this.preferencesStore = preferencesStore;
    }

    protected void startUp() throws Exception {
        LOG.info("Starting AdapterService");
        registerAdapters();
    }

    protected void shutDown() throws Exception {
        LOG.info("Shutting down AdapterService");
    }

    @Nullable
    public AdapterTypeInfo getAdapterTypeInfo(String str) {
        return this.adapterTypeInfos.get(str);
    }

    public AdapterSpecification getAdapter(String str, String str2) throws AdapterNotFoundException {
        AdapterSpecification adapter = this.store.getAdapter(Id.Namespace.from(str), str2);
        if (adapter == null) {
            throw new AdapterNotFoundException(str2);
        }
        return adapter;
    }

    public AdapterStatus getAdapterStatus(String str, String str2) throws AdapterNotFoundException {
        AdapterStatus adapterStatus = this.store.getAdapterStatus(Id.Namespace.from(str), str2);
        if (adapterStatus == null) {
            throw new AdapterNotFoundException(str2);
        }
        return adapterStatus;
    }

    public AdapterStatus setAdapterStatus(String str, String str2, AdapterStatus adapterStatus) throws AdapterNotFoundException {
        AdapterStatus adapterStatus2 = this.store.setAdapterStatus(Id.Namespace.from(str), str2, adapterStatus);
        if (adapterStatus2 == null) {
            throw new AdapterNotFoundException(str2);
        }
        return adapterStatus2;
    }

    public Collection<AdapterSpecification> getAdapters(String str) {
        return this.store.getAllAdapters(Id.Namespace.from(str));
    }

    public Collection<AdapterSpecification> getAdapters(String str, String str2) {
        ArrayList newArrayList = Lists.newArrayList();
        for (AdapterSpecification adapterSpecification : getAdapters(str)) {
            if (adapterSpecification.getType().equals(str2)) {
                newArrayList.add(adapterSpecification);
            }
        }
        return newArrayList;
    }

    public void createAdapter(String str, AdapterSpecification adapterSpecification) throws IllegalArgumentException, AdapterAlreadyExistsException {
        AdapterTypeInfo adapterTypeInfo = this.adapterTypeInfos.get(adapterSpecification.getType());
        Preconditions.checkArgument(adapterTypeInfo != null, "Adapter type %s not found", new Object[]{adapterSpecification.getType()});
        String name = adapterSpecification.getName();
        if (this.store.getAdapter(Id.Namespace.from(str), name) != null) {
            throw new AdapterAlreadyExistsException(name);
        }
        ApplicationSpecification deployApplication = deployApplication(str, adapterTypeInfo);
        validateSources(name, adapterSpecification.getSources());
        createSinks(adapterSpecification.getSinks(), adapterTypeInfo);
        this.preferencesStore.setProperties(str, deployApplication.getName(), ImmutableMap.of(ProgramOptionConstants.CONCURRENT_RUNS_ENABLED, "true"));
        schedule(str, deployApplication, adapterTypeInfo, adapterSpecification);
        this.store.addAdapter(Id.Namespace.from(str), adapterSpecification);
    }

    public void removeAdapter(String str, String str2) throws AdapterNotFoundException {
        Id.Namespace from = Id.Namespace.from(str);
        AdapterSpecification adapter = getAdapter(str, str2);
        unschedule(str, this.store.getApplication(Id.Application.from(from, adapter.getType())), this.adapterTypeInfos.get(adapter.getType()), adapter);
        this.store.removeAdapter(from, str2);
    }

    public void stopAdapter(String str, String str2) throws AdapterNotFoundException, InvalidAdapterOperationException {
        if (AdapterStatus.STOPPED.equals(getAdapterStatus(str, str2))) {
            throw new InvalidAdapterOperationException("Adapter is already stopped.");
        }
        AdapterSpecification adapter = getAdapter(str, str2);
        ApplicationSpecification application = this.store.getApplication(Id.Application.from(str, adapter.getType()));
        ProgramType programType = this.adapterTypeInfos.get(adapter.getType()).getProgramType();
        Preconditions.checkArgument(programType.equals(ProgramType.WORKFLOW), String.format("Unsupported program type %s for adapter", programType.toString()));
        Iterator<Map.Entry<String, WorkflowSpecification>> it = application.getWorkflows().entrySet().iterator();
        while (it.hasNext()) {
            Id.Program from = Id.Program.from(str, application.getName(), it.next().getValue().getName());
            this.scheduler.suspendSchedule(from, SchedulableProgramType.WORKFLOW, constructScheduleName(from, str2));
        }
        setAdapterStatus(str, str2, AdapterStatus.STOPPED);
    }

    public void startAdapter(String str, String str2) throws AdapterNotFoundException, InvalidAdapterOperationException {
        if (AdapterStatus.STARTED.equals(getAdapterStatus(str, str2))) {
            throw new InvalidAdapterOperationException("Adapter is already started.");
        }
        AdapterSpecification adapter = getAdapter(str, str2);
        ApplicationSpecification application = this.store.getApplication(Id.Application.from(str, adapter.getType()));
        ProgramType programType = this.adapterTypeInfos.get(adapter.getType()).getProgramType();
        Preconditions.checkArgument(programType.equals(ProgramType.WORKFLOW), String.format("Unsupported program type %s for adapter", programType.toString()));
        Iterator<Map.Entry<String, WorkflowSpecification>> it = application.getWorkflows().entrySet().iterator();
        while (it.hasNext()) {
            Id.Program from = Id.Program.from(str, application.getName(), it.next().getValue().getName());
            this.scheduler.resumeSchedule(from, SchedulableProgramType.WORKFLOW, constructScheduleName(from, str2));
        }
        setAdapterStatus(str, str2, AdapterStatus.STARTED);
    }

    private ApplicationSpecification deployApplication(String str, AdapterTypeInfo adapterTypeInfo) {
        try {
            ApplicationSpecification application = this.store.getApplication(Id.Application.from(str, adapterTypeInfo.getType()));
            if (application != null) {
                return application;
            }
            return ((ApplicationWithPrograms) this.managerFactory.create(new ProgramTerminator() { // from class: co.cask.cdap.internal.app.runtime.adapter.AdapterService.2
                @Override // co.cask.cdap.internal.app.deploy.ProgramTerminator
                public void stop(Id.Namespace namespace, Id.Program program, ProgramType programType) throws ExecutionException {
                }
            }).deploy(Id.Namespace.from(str), adapterTypeInfo.getType(), new DeploymentInfo(adapterTypeInfo.getFile(), this.locationFactory.create(this.archiveDir).append(str).append(adapterTypeInfo.getType()), ApplicationDeployScope.SYSTEM)).get()).getSpecification();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void schedule(String str, ApplicationSpecification applicationSpecification, AdapterTypeInfo adapterTypeInfo, AdapterSpecification adapterSpecification) {
        ProgramType programType = adapterTypeInfo.getProgramType();
        Preconditions.checkArgument(programType.equals(ProgramType.WORKFLOW), String.format("Unsupported program type %s for adapter", programType.toString()));
        Iterator<Map.Entry<String, WorkflowSpecification>> it = applicationSpecification.getWorkflows().entrySet().iterator();
        while (it.hasNext()) {
            addSchedule(Id.Program.from(str, applicationSpecification.getName(), it.next().getValue().getName()), SchedulableProgramType.WORKFLOW, adapterSpecification);
        }
    }

    private void unschedule(String str, ApplicationSpecification applicationSpecification, AdapterTypeInfo adapterTypeInfo, AdapterSpecification adapterSpecification) {
        ProgramType programType = adapterTypeInfo.getProgramType();
        Preconditions.checkArgument(programType.equals(ProgramType.WORKFLOW), String.format("Unsupported program type %s for adapter", programType.toString()));
        Iterator<Map.Entry<String, WorkflowSpecification>> it = applicationSpecification.getWorkflows().entrySet().iterator();
        while (it.hasNext()) {
            Id.Program from = Id.Program.from(str, adapterSpecification.getType(), it.next().getValue().getName());
            deleteSchedule(from, SchedulableProgramType.WORKFLOW, constructScheduleName(from, adapterSpecification.getName()));
        }
    }

    private void addSchedule(Id.Program program, SchedulableProgramType schedulableProgramType, AdapterSpecification adapterSpecification) {
        String str = (String) adapterSpecification.getProperties().get("frequency");
        Preconditions.checkArgument(str != null, "Frequency of running the adapter is missing from adapter properties. Cannot schedule program.");
        String cronExpr = Schedules.toCronExpr(str);
        String name = adapterSpecification.getName();
        ScheduleSpecification scheduleSpecification = new ScheduleSpecification(new Schedule(constructScheduleName(program, name), getScheduleDescription(name), cronExpr), new ScheduleProgramInfo(schedulableProgramType, program.getId()), adapterSpecification.getProperties());
        this.scheduler.schedule(program, scheduleSpecification.getProgram().getProgramType(), scheduleSpecification.getSchedule());
        this.store.addSchedule(program, scheduleSpecification);
    }

    private void deleteSchedule(Id.Program program, SchedulableProgramType schedulableProgramType, String str) {
        this.scheduler.deleteSchedule(program, schedulableProgramType, str);
        this.store.deleteSchedule(program, schedulableProgramType, str);
    }

    private void validateSources(String str, Set<Source> set) throws IllegalArgumentException {
        for (Source source : set) {
            Preconditions.checkArgument(Source.Type.STREAM.equals(source.getType()), String.format("Unknown Source type: %s", source.getType()));
            Preconditions.checkArgument(streamExists(source.getName()), String.format("Stream %s must exist during create of adapter: %s", source.getName(), str));
        }
    }

    private boolean streamExists(String str) {
        try {
            return this.streamAdmin.exists(str);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void createSinks(Set<Sink> set, AdapterTypeInfo adapterTypeInfo) {
        for (Sink sink : set) {
            Preconditions.checkArgument(Sink.Type.DATASET.equals(sink.getType()), String.format("Unknown Sink type: %s", sink.getType()));
            DatasetProperties build = DatasetProperties.builder().addAll(adapterTypeInfo.getDefaultSinkProperties()).addAll(sink.getProperties()).build();
            String str = (String) build.getProperties().get(DATASET_CLASS);
            Preconditions.checkArgument(str != null, "Dataset class cannot be null");
            createDataset(sink.getName(), str, build);
        }
    }

    private void createDataset(String str, String str2, DatasetProperties datasetProperties) {
        try {
            if (this.datasetFramework.hasInstance(str)) {
                LOG.debug("Dataset instance {} already exists; not creating a new one.", str);
            } else {
                this.datasetFramework.addInstance(str2, str, datasetProperties);
                LOG.debug("Dataset instance {} created with properties: {}.", str, datasetProperties);
            }
        } catch (DatasetManagementException e) {
            LOG.error("Error while creating dataset {}", str, e);
            throw new RuntimeException((Throwable) e);
        } catch (IOException e2) {
            LOG.error("Error while creating dataset {}", str, e2);
            throw new RuntimeException(e2);
        }
    }

    @VisibleForTesting
    void registerAdapters() {
        try {
            for (File file : FileUtils.listFiles(new File(this.configuration.get("app.adapter.dir")), new String[]{"jar"}, true)) {
                try {
                    AdapterTypeInfo createAdapterTypeInfo = createAdapterTypeInfo(file, new JarFile(file.getAbsolutePath()).getManifest());
                    if (createAdapterTypeInfo != null) {
                        this.adapterTypeInfos.put(createAdapterTypeInfo.getType(), createAdapterTypeInfo);
                    } else {
                        LOG.warn("Missing required information to create adapter {}", file.getAbsolutePath());
                    }
                } catch (IOException e) {
                    LOG.warn(String.format("Unable to read adapter jar %s", file.getAbsolutePath()));
                }
            }
        } catch (Exception e2) {
            LOG.warn("Unable to read the plugins directory");
        }
    }

    private AdapterTypeInfo createAdapterTypeInfo(File file, Manifest manifest) {
        if (manifest == null) {
            return null;
        }
        Attributes mainAttributes = manifest.getMainAttributes();
        String value = mainAttributes.getValue("CDAP-Adapter-Type");
        String value2 = mainAttributes.getValue("CDAP-Adapter-Program-Type");
        String value3 = mainAttributes.getValue("CDAP-Adapter-Properties");
        String value4 = mainAttributes.getValue("CDAP-Source-Type");
        String value5 = mainAttributes.getValue("CDAP-Sink-Type");
        String value6 = mainAttributes.getValue("CDAP-Source-Properties");
        String value7 = mainAttributes.getValue("CDAP-Sink-Properties");
        if (value == null || value4 == null || value5 == null || value2 == null) {
            return null;
        }
        return new AdapterTypeInfo(file, value, Source.Type.valueOf(value4.toUpperCase()), Sink.Type.valueOf(value5.toUpperCase()), propertiesFromString(value6), propertiesFromString(value7), propertiesFromString(value3), ProgramType.valueOf(value2.toUpperCase()));
    }

    protected Map<String, String> propertiesFromString(String str) {
        Map<String, String> map = (Map) GSON.fromJson(str, STRING_STRING_MAP_TYPE);
        return map == null ? Maps.newHashMap() : map;
    }

    public String constructScheduleName(Id.Program program, String str) {
        return String.format("%s.%s", str, program.getId());
    }

    public String getScheduleDescription(String str) {
        return String.format("Schedule for adapter: %s", str);
    }
}
