package org.apache.beam.runners.twister2;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
import edu.iu.dsc.tws.api.tset.sets.TSet;
import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
import edu.iu.dsc.tws.local.LocalSubmitter;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.util.construction.PTransformMatchers;
import org.apache.beam.sdk.util.construction.SplittableParDo;
import org.apache.beam.sdk.util.construction.SplittableParDoNaiveBounded;
import org.apache.beam.sdk.util.construction.resources.PipelineResources;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/* loaded from: input_file:org/apache/beam/runners/twister2/Twister2Runner.class */
public class Twister2Runner extends PipelineRunner<PipelineResult> {
    private static final Logger LOG = Logger.getLogger(Twister2Runner.class.getName());
    private static final String SIDEINPUTS = "sideInputs";
    private static final String LEAVES = "leaves";
    private static final String GRAPH = "graph";
    private final Twister2PipelineOptions options;

    protected Twister2Runner(Twister2PipelineOptions twister2PipelineOptions) {
        this.options = twister2PipelineOptions;
    }

    public static Twister2Runner fromOptions(PipelineOptions pipelineOptions) {
        return new Twister2Runner((Twister2PipelineOptions) PipelineOptionsValidator.validate(Twister2PipelineOptions.class, pipelineOptions));
    }

    public PipelineResult run(Pipeline pipeline) {
        Twister2PipelineExecutionEnvironment twister2PipelineExecutionEnvironment = new Twister2PipelineExecutionEnvironment(this.options);
        LOG.info("Translating pipeline to Twister2 program.");
        pipeline.replaceAll(getDefaultOverrides());
        if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
            SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
        }
        twister2PipelineExecutionEnvironment.translate(pipeline);
        setupSystem(this.options);
        HashMap hashMap = new HashMap();
        JobConfig jobConfig = new JobConfig();
        if (isLocalMode(this.options)) {
            this.options.setParallelism(1);
            hashMap.put(SIDEINPUTS, extractNames(twister2PipelineExecutionEnvironment.getSideInputs()));
            hashMap.put(LEAVES, extractNames(twister2PipelineExecutionEnvironment.getLeaves()));
            hashMap.put(GRAPH, twister2PipelineExecutionEnvironment.getTSetGraph());
            hashMap.put("twister2.network.buffer.size", 32000);
            hashMap.put("twister2.network.sendBuffer.count", Integer.valueOf(this.options.getParallelism()));
            LOG.warning("Twister2 Local Mode currently only supports single worker");
        } else {
            jobConfig.put(SIDEINPUTS, extractNames(twister2PipelineExecutionEnvironment.getSideInputs()));
            jobConfig.put(LEAVES, extractNames(twister2PipelineExecutionEnvironment.getLeaves()));
            jobConfig.put(GRAPH, twister2PipelineExecutionEnvironment.getTSetGraph());
        }
        Config loadConfig = ResourceAllocator.loadConfig(hashMap);
        Twister2Job build = Twister2Job.newBuilder().setJobName(this.options.getJobName()).setWorkerClass(BeamBatchWorker.class).addComputeResource(this.options.getWorkerCPUs(), this.options.getRamMegaBytes(), this.options.getParallelism()).setConfig(jobConfig).build();
        return new Twister2PipelineResult(isLocalMode(this.options) ? LocalSubmitter.submitJob(build, loadConfig) : Twister2Submitter.submitJob(build, loadConfig));
    }

    private boolean isLocalMode(Twister2PipelineOptions twister2PipelineOptions) {
        return twister2PipelineOptions.getTwister2Home() == null || "".equals(twister2PipelineOptions.getTwister2Home());
    }

    public PipelineResult runTest(Pipeline pipeline) {
        Twister2PipelineExecutionEnvironment twister2PipelineExecutionEnvironment = new Twister2PipelineExecutionEnvironment(this.options);
        LOG.info("Translating pipeline to Twister2 program.");
        pipeline.replaceAll(getDefaultOverrides());
        if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
            SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
        }
        twister2PipelineExecutionEnvironment.translate(pipeline);
        setupSystemTest(this.options);
        HashMap hashMap = new HashMap();
        hashMap.put(SIDEINPUTS, extractNames(twister2PipelineExecutionEnvironment.getSideInputs()));
        hashMap.put(LEAVES, extractNames(twister2PipelineExecutionEnvironment.getLeaves()));
        hashMap.put(GRAPH, twister2PipelineExecutionEnvironment.getTSetGraph());
        hashMap.put("twister2.network.buffer.size", 32000);
        hashMap.put("twister2.network.sendBuffer.count", Integer.valueOf(this.options.getParallelism()));
        Twister2JobState submitJob = LocalSubmitter.submitJob(Twister2Job.newBuilder().setJobName(this.options.getJobName()).setWorkerClass(BeamBatchWorker.class).addComputeResource(this.options.getWorkerCPUs(), this.options.getRamMegaBytes(), this.options.getParallelism()).setConfig(new JobConfig()).build(), ResourceAllocator.loadConfig(hashMap));
        Twister2PipelineResult twister2PipelineResult = new Twister2PipelineResult(submitJob);
        if (twister2PipelineResult.state == PipelineResult.State.FAILED) {
            throw new RuntimeException("Pipeline execution failed", submitJob.getCause());
        }
        return twister2PipelineResult;
    }

    private void setupSystem(Twister2PipelineOptions twister2PipelineOptions) {
        prepareFilesToStage(twister2PipelineOptions);
        zipFilesToStage(twister2PipelineOptions);
        System.setProperty("cluster_type", twister2PipelineOptions.getClusterType());
        System.setProperty("job_file", twister2PipelineOptions.getJobFileZip());
        System.setProperty("job_type", twister2PipelineOptions.getJobType());
        if (isLocalMode(twister2PipelineOptions)) {
            System.setProperty("twister2_home", System.getProperty("java.io.tmpdir"));
            System.setProperty("config_dir", System.getProperty("java.io.tmpdir") + "/conf/");
            return;
        }
        System.setProperty("twister2_home", twister2PipelineOptions.getTwister2Home());
        System.setProperty("config_dir", twister2PipelineOptions.getTwister2Home() + "/conf/");
        File file = new File(System.getProperty("config_dir"), twister2PipelineOptions.getClusterType());
        for (String str : new String[]{"core.yaml", "network.yaml", "data.yaml", "resource.yaml", "task.yaml"}) {
            if (!new File(file, str).exists()) {
                throw new Twister2RuntimeException("Couldn't find " + str + " in config directory specified.");
            }
        }
        FileInputStream fileInputStream = null;
        try {
            try {
                fileInputStream = new FileInputStream(new File(file, "logger.properties"));
                LogManager.getLogManager().readConfiguration(fileInputStream);
                fileInputStream.close();
                if (fileInputStream != null) {
                    try {
                        fileInputStream.close();
                    } catch (IOException e) {
                        LOG.info(e.getMessage());
                    }
                }
            } catch (Throwable th) {
                if (fileInputStream != null) {
                    try {
                        fileInputStream.close();
                    } catch (IOException e2) {
                        LOG.info(e2.getMessage());
                    }
                }
                throw th;
            }
        } catch (IOException e3) {
            LOG.warning("Couldn't load logging configuration");
            if (fileInputStream != null) {
                try {
                    fileInputStream.close();
                } catch (IOException e4) {
                    LOG.info(e4.getMessage());
                }
            }
        }
    }

    private void setupSystemTest(Twister2PipelineOptions twister2PipelineOptions) {
        prepareFilesToStage(twister2PipelineOptions);
        zipFilesToStage(twister2PipelineOptions);
        System.setProperty("cluster_type", twister2PipelineOptions.getClusterType());
        System.setProperty("twister2_home", System.getProperty("java.io.tmpdir"));
        System.setProperty("job_file", twister2PipelineOptions.getJobFileZip());
        System.setProperty("job_type", twister2PipelineOptions.getJobType());
    }

    private Set<String> extractNames(Set<TSet> set) {
        HashSet hashSet = new HashSet();
        Iterator<TSet> it = set.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getId());
        }
        return hashSet;
    }

    private Map<String, String> extractNames(Map<String, BatchTSet<?>> map) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<String, BatchTSet<?>> entry : map.entrySet()) {
            linkedHashMap.put(entry.getKey(), entry.getValue().getId());
        }
        return linkedHashMap;
    }

    private void prepareFilesToStage(Twister2PipelineOptions twister2PipelineOptions) {
        PipelineResources.prepareFilesForStaging(twister2PipelineOptions);
    }

    private void zipFilesToStage(Twister2PipelineOptions twister2PipelineOptions) {
        File file = null;
        HashSet hashSet = new HashSet();
        List<String> filesToStage = twister2PipelineOptions.getFilesToStage();
        ArrayList arrayList = new ArrayList();
        for (String str : filesToStage) {
            if (!str.contains("/org/twister2")) {
                arrayList.add(str);
            }
        }
        FileInputStream fileInputStream = null;
        try {
            try {
                file = File.createTempFile("twister2-", ".zip");
                FileOutputStream fileOutputStream = new FileOutputStream(file);
                ZipOutputStream zipOutputStream = new ZipOutputStream(fileOutputStream);
                zipOutputStream.putNextEntry(new ZipEntry("lib/"));
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    File file2 = new File((String) it.next());
                    if (!hashSet.contains(file2.getName())) {
                        hashSet.add(file2.getName());
                        fileInputStream = new FileInputStream(file2);
                        zipOutputStream.putNextEntry(new ZipEntry("lib/" + file2.getName()));
                        byte[] bArr = new byte[1024];
                        while (true) {
                            int read = fileInputStream.read(bArr);
                            if (read < 0) {
                                break;
                            } else {
                                zipOutputStream.write(bArr, 0, read);
                            }
                        }
                        fileInputStream.close();
                    }
                }
                zipOutputStream.close();
                fileOutputStream.close();
                file.deleteOnExit();
                if (fileInputStream != null) {
                    try {
                        fileInputStream.close();
                    } catch (IOException e) {
                        LOG.info(e.getMessage());
                    }
                }
            } catch (Throwable th) {
                if (fileInputStream != null) {
                    try {
                        fileInputStream.close();
                    } catch (IOException e2) {
                        LOG.info(e2.getMessage());
                    }
                }
                throw th;
            }
        } catch (FileNotFoundException e3) {
            LOG.info(e3.getMessage());
            if (fileInputStream != null) {
                try {
                    fileInputStream.close();
                } catch (IOException e4) {
                    LOG.info(e4.getMessage());
                }
            }
        } catch (IOException e5) {
            LOG.info(e5.getMessage());
            if (fileInputStream != null) {
                try {
                    fileInputStream.close();
                } catch (IOException e6) {
                    LOG.info(e6.getMessage());
                }
            }
        }
        if (file != null) {
            twister2PipelineOptions.setJobFileZip(file.getPath());
        }
    }

    private static List<PTransformOverride> getDefaultOverrides() {
        return ImmutableList.builder().add(PTransformOverride.of(PTransformMatchers.splittableParDo(), new SplittableParDo.OverrideFactory())).add(PTransformOverride.of(PTransformMatchers.urnEqualTo("beam:transform:sdf_process_keyed_elements:v1"), new SplittableParDoNaiveBounded.OverrideFactory())).build();
    }
}
