package gobblin.aws;

import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import gobblin.annotation.Alpha;
import gobblin.cluster.JobConfigurationManager;
import gobblin.util.ExecutorsUtils;
import gobblin.util.SchedulerUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:gobblin/aws/AWSJobConfigurationManager.class */
public class AWSJobConfigurationManager extends JobConfigurationManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(AWSJobConfigurationManager.class);
    private static final long DEFAULT_JOB_CONF_REFRESH_INTERVAL = 60;
    private Optional<String> jobConfS3Uri;
    private Map<String, Properties> jobConfFiles;
    private final long refreshIntervalInSeconds;
    private final ScheduledExecutorService fetchJobConfExecutor;

    public AWSJobConfigurationManager(EventBus eventBus, Config config) {
        super(eventBus, config);
        this.jobConfFiles = Maps.newHashMap();
        if (config.hasPath(GobblinAWSConfigurationKeys.JOB_CONF_REFRESH_INTERVAL)) {
            this.refreshIntervalInSeconds = config.getDuration(GobblinAWSConfigurationKeys.JOB_CONF_REFRESH_INTERVAL, TimeUnit.SECONDS);
        } else {
            this.refreshIntervalInSeconds = DEFAULT_JOB_CONF_REFRESH_INTERVAL;
        }
        this.fetchJobConfExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("FetchJobConfExecutor")));
    }

    private void fetchJobConfSettings() {
        this.jobConfDirPath = this.config.hasPath("gobblin.cluster.job.conf.path") ? Optional.of(this.config.getString("gobblin.cluster.job.conf.path")) : Optional.absent();
        this.jobConfS3Uri = this.config.hasPath(GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY) ? Optional.of(this.config.getString(GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY)) : Optional.absent();
    }

    protected void startUp() throws Exception {
        LOGGER.info("Starting the " + AWSJobConfigurationManager.class.getSimpleName());
        LOGGER.info(String.format("Scheduling the job configuration refresh task with an interval of %d second(s)", Long.valueOf(this.refreshIntervalInSeconds)));
        this.fetchJobConfExecutor.scheduleAtFixedRate(new Runnable() { // from class: gobblin.aws.AWSJobConfigurationManager.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AWSJobConfigurationManager.this.fetchJobConf();
                } catch (IOException | ConfigurationException e) {
                    AWSJobConfigurationManager.LOGGER.error("Failed to fetch job configurations", e);
                    throw new RuntimeException("Failed to fetch job configurations", e);
                }
            }
        }, 0L, this.refreshIntervalInSeconds, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchJobConf() throws IOException, ConfigurationException {
        fetchJobConfSettings();
        if (this.jobConfS3Uri.isPresent() && this.jobConfDirPath.isPresent()) {
            String str = GobblinAWSUtils.appendSlash((String) this.jobConfDirPath.get()) + StringUtils.substringAfterLast((String) this.jobConfS3Uri.get(), File.separator);
            LOGGER.debug("Downloading to zip: " + str + " from uri: " + ((String) this.jobConfS3Uri.get()));
            FileUtils.copyURLToFile(new URL((String) this.jobConfS3Uri.get()), new File(str));
            String str2 = GobblinAWSUtils.appendSlash((String) this.jobConfDirPath.get()) + "files";
            LOGGER.debug("Extracting to directory: " + str2 + " from zip: " + str);
            unzipArchive(str, new File(str2));
            File file = new File(str2);
            if (!file.exists()) {
                LOGGER.warn("Job configuration directory " + file + " not found");
                return;
            }
            LOGGER.info("Loading job configurations from " + file);
            Properties properties = new Properties();
            properties.setProperty("jobconf.fullyQualifiedPath", file.getAbsolutePath());
            List<Properties> loadGenericJobConfigs = SchedulerUtils.loadGenericJobConfigs(properties);
            LOGGER.info("Loaded " + loadGenericJobConfigs.size() + " job configuration(s)");
            for (Properties properties2 : loadGenericJobConfigs) {
                LOGGER.debug("Config value: " + properties2);
                String property = properties2.getProperty("job.config.path");
                if (!this.jobConfFiles.containsKey(property)) {
                    this.jobConfFiles.put(property, properties2);
                    postNewJobConfigArrival(properties2.getProperty("job.name"), properties2);
                    LOGGER.info("New config arrived for job: " + property);
                } else if (properties2.equals(this.jobConfFiles.get(property))) {
                    LOGGER.info("Config not changed for job: " + property);
                } else {
                    this.jobConfFiles.put(property, properties2);
                    postNewJobConfigArrival(properties2.getProperty("job.name"), properties2);
                    LOGGER.info("Config updated for job: " + property);
                }
            }
        }
    }

    public void unzipArchive(String str, File file) throws IOException {
        ZipFile zipFile = new ZipFile(str);
        Throwable th = null;
        try {
            Enumeration<? extends ZipEntry> entries = zipFile.entries();
            while (entries.hasMoreElements()) {
                ZipEntry nextElement = entries.nextElement();
                File file2 = new File(file, nextElement.getName());
                if (nextElement.isDirectory()) {
                    if (!file2.mkdirs() && !file2.exists()) {
                        throw new IOException("Could not create directory: " + file2 + " while un-archiving zip: " + str);
                    }
                } else {
                    if (!file2.getParentFile().mkdirs() && !file2.getParentFile().exists()) {
                        throw new IOException("Could not create parent directory for: " + file2 + " while un-archiving zip: " + str);
                    }
                    InputStream inputStream = null;
                    FileOutputStream fileOutputStream = null;
                    try {
                        inputStream = zipFile.getInputStream(nextElement);
                        fileOutputStream = new FileOutputStream(file2);
                        IOUtils.copy(inputStream, fileOutputStream);
                        if (null != inputStream) {
                            IOUtils.closeQuietly(inputStream);
                        }
                        if (null != fileOutputStream) {
                            IOUtils.closeQuietly(fileOutputStream);
                        }
                    } catch (Throwable th2) {
                        if (null != inputStream) {
                            IOUtils.closeQuietly(inputStream);
                        }
                        if (null != fileOutputStream) {
                            IOUtils.closeQuietly(fileOutputStream);
                        }
                        throw th2;
                    }
                }
            }
            if (zipFile != null) {
                if (0 == 0) {
                    zipFile.close();
                    return;
                }
                try {
                    zipFile.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (zipFile != null) {
                if (0 != 0) {
                    try {
                        zipFile.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    zipFile.close();
                }
            }
            throw th4;
        }
    }

    protected void shutDown() throws Exception {
        GobblinAWSUtils.shutdownExecutorService(getClass(), this.fetchJobConfExecutor, LOGGER);
    }
}
