package org.apache.beam.examples.subprocess;

import java.util.ArrayList;
import java.util.Iterator;
import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs;
import org.apache.beam.examples.subprocess.kernel.SubProcessKernel;
import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/examples/subprocess/ExampleEchoPipeline.class */
public class ExampleEchoPipeline {
    static final Logger LOG = LoggerFactory.getLogger(ExampleEchoPipeline.class);

    /* loaded from: input_file:org/apache/beam/examples/subprocess/ExampleEchoPipeline$EchoInputDoFn.class */
    public static class EchoInputDoFn extends DoFn<KV<String, String>, KV<String, String>> {
        static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class);
        private SubProcessConfiguration configuration;
        private String binaryName;

        public EchoInputDoFn(SubProcessConfiguration subProcessConfiguration, String str) {
            this.configuration = subProcessConfiguration;
            this.binaryName = str;
        }

        @DoFn.Setup
        public void setUp() throws Exception {
            CallingSubProcessUtils.setUp(this.configuration, this.binaryName);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, String>, KV<String, String>>.ProcessContext processContext) throws Exception {
            try {
                SubProcessCommandLineArgs subProcessCommandLineArgs = new SubProcessCommandLineArgs();
                subProcessCommandLineArgs.putCommand(new SubProcessCommandLineArgs.Command(0, String.valueOf(((KV) processContext.element()).getValue())));
                Iterator<String> it = new SubProcessKernel(this.configuration, this.binaryName).exec(subProcessCommandLineArgs).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of((String) ((KV) processContext.element()).getKey(), it.next()));
                }
            } catch (Exception e) {
                LOG.error("Error processing element ", e);
                throw e;
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        SubProcessPipelineOptions subProcessPipelineOptions = (SubProcessPipelineOptions) PipelineOptionsFactory.fromArgs(strArr).withValidation().as(SubProcessPipelineOptions.class);
        Pipeline create = Pipeline.create(subProcessPipelineOptions);
        SubProcessConfiguration subProcessConfiguration = subProcessPipelineOptions.getSubProcessConfiguration();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10000; i++) {
            String valueOf = String.valueOf(i);
            arrayList.add(KV.of(valueOf, valueOf));
        }
        create.apply(Create.of(arrayList)).apply("Echo inputs round 1", ParDo.of(new EchoInputDoFn(subProcessConfiguration, "Echo"))).apply("Echo inputs round 2", ParDo.of(new EchoInputDoFn(subProcessConfiguration, "EchoAgain")));
        create.run();
    }

    private static String getTestShellEcho() {
        return "#!/bin/sh\nfilename=$1;\necho $2 >> $filename;";
    }

    private static String getTestShellEchoAgain() {
        return "#!/bin/sh\nfilename=$1;\necho \"You again? Well ok, here is your word again.\" >> $2 >> $filename;";
    }
}
