package org.apache.falcon.workflow;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.falcon.FalconException;
import org.apache.falcon.catalog.CatalogPartition;
import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.resource.metadata.LineageMetadataResource;
import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-oozie-adaptor-0.9.jar:org/apache/falcon/workflow/LateDataHandler.class */
public class LateDataHandler extends Configured implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(LateDataHandler.class);

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(OozieActionConfigurationHelper.createActionConf(), new LateDataHandler(), strArr);
    }

    private static CommandLine getCommand(String[] strArr) throws ParseException {
        Options options = new Options();
        Option option = new Option(LineageMetadataResource.OUT, true, "Out file name");
        option.setRequired(true);
        options.addOption(option);
        Option option2 = new Option("paths", true, "Comma separated path list, further separated by #");
        option2.setRequired(true);
        options.addOption(option2);
        Option option3 = new Option(WorkflowExecutionArgs.INPUT_NAMES.getName(), true, "Input feed names, further separated by #");
        option3.setRequired(true);
        options.addOption(option3);
        Option option4 = new Option(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), true, "Feed storage types corresponding to Input feed names, separated by #");
        option4.setRequired(true);
        options.addOption(option4);
        return new GnuParser().parse(options, strArr);
    }

    public int run(String[] strArr) throws Exception {
        CommandLine command = getCommand(strArr);
        String optionValue = getOptionValue(command, "paths");
        if (optionValue == null) {
            return 0;
        }
        Map<String, Long> computeMetrics = computeMetrics(getOptionValue(command, WorkflowExecutionArgs.INPUT_NAMES.getName()).split("#"), optionValue.split("#"), getOptionValue(command, WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()).split("#"));
        Path path = new Path(command.getOptionValue(LineageMetadataResource.OUT));
        LOG.info("Persisting late data metrics: {} to file: {}", computeMetrics, path);
        persistMetrics(computeMetrics, path);
        return 0;
    }

    private String getOptionValue(CommandLine commandLine, String str) {
        String optionValue = commandLine.getOptionValue(str);
        if (optionValue.equals("null")) {
            return null;
        }
        return optionValue;
    }

    private Map<String, Long> computeMetrics(String[] strArr, String[] strArr2, String[] strArr3) throws IOException, FalconException, URISyntaxException {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (int i = 0; i < strArr2.length; i++) {
            linkedHashMap.put(strArr[i], Long.valueOf(computeStorageMetric(strArr2[i], strArr3[i], getConf())));
        }
        return linkedHashMap;
    }

    private void persistMetrics(Map<String, Long> map, Path path) throws IOException, FalconException {
        OutputStream outputStream = null;
        try {
            outputStream = HadoopClientFactory.get().createProxiedFileSystem(path.toUri()).create(path);
            for (Map.Entry<String, Long> entry : map.entrySet()) {
                outputStream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
            }
            if (outputStream != null) {
                try {
                    outputStream.close();
                } catch (IOException e) {
                }
            }
        } catch (Throwable th) {
            if (outputStream != null) {
                try {
                    outputStream.close();
                } catch (IOException e2) {
                }
            }
            throw th;
        }
    }

    public long computeStorageMetric(String str, String str2, Configuration configuration) throws IOException, FalconException, URISyntaxException {
        Storage.TYPE valueOf = Storage.TYPE.valueOf(str2);
        if (valueOf == Storage.TYPE.FILESYSTEM) {
            return getFileSystemUsageMetric(str, configuration);
        }
        if (valueOf == Storage.TYPE.TABLE) {
            return getTablePartitionCreateTimeMetric(str.replace("hcat", "thrift"));
        }
        throw new IllegalArgumentException("Unknown storage type: " + str2);
    }

    private long getFileSystemUsageMetric(String str, Configuration configuration) throws IOException, FalconException {
        long j = 0;
        for (String str2 : str.split(",")) {
            j += usage(new Path(str2), configuration);
        }
        return j;
    }

    private long usage(Path path, Configuration configuration) throws IOException, FalconException {
        FileSystem createProxiedFileSystem = HadoopClientFactory.get().createProxiedFileSystem(path.toUri(), configuration);
        FileStatus[] globStatus = createProxiedFileSystem.globStatus(path);
        if (globStatus == null || globStatus.length == 0) {
            return 0L;
        }
        long j = 0;
        for (FileStatus fileStatus : globStatus) {
            j += createProxiedFileSystem.getContentSummary(fileStatus.getPath()).getLength();
        }
        return j;
    }

    private long getTablePartitionCreateTimeMetric(String str) throws IOException, URISyntaxException, FalconException {
        CatalogStorage catalogStorage = (CatalogStorage) FeedHelper.createStorage(Storage.TYPE.TABLE.name(), str, getConf());
        CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(getConf(), catalogStorage.getCatalogUrl(), catalogStorage.getDatabase(), catalogStorage.getTable(), new ArrayList(catalogStorage.getPartitions().values()));
        if (partition == null) {
            return 0L;
        }
        return partition.getCreateTime();
    }

    public String detectChanges(Path path, Map<String, Long> map, Configuration configuration) throws Exception {
        StringBuilder sb = new StringBuilder();
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(HadoopClientFactory.get().createProxiedFileSystem(path.toUri(), configuration).open(path)));
        try {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    break;
                }
                if (!readLine.isEmpty()) {
                    int indexOf = readLine.indexOf(61);
                    linkedHashMap.put(readLine.substring(0, indexOf), Long.valueOf(Long.parseLong(readLine.substring(indexOf + 1))));
                }
            }
            for (Map.Entry<String, Long> entry : map.entrySet()) {
                if (linkedHashMap.get(entry.getKey()) == null) {
                    LOG.info("No matching key {}", entry.getKey());
                } else if (!((Long) linkedHashMap.get(entry.getKey())).equals(entry.getValue())) {
                    LOG.info("Recorded size: {} is different from new size {}", linkedHashMap.get(entry.getKey()), entry.getValue());
                    sb.append(entry.getKey()).append(',');
                }
            }
            if (sb.length() == 0) {
                return "";
            }
            String substring = sb.substring(0, sb.length() - 1);
            bufferedReader.close();
            return substring;
        } finally {
            bufferedReader.close();
        }
    }
}
