package org.apache.beam.examples.subprocess;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs;
import org.apache.beam.examples.subprocess.kernel.SubProcessKernel;
import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.class */
public class ExampleEchoPipelineTest {

    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    /* loaded from: input_file:org/apache/beam/examples/subprocess/ExampleEchoPipelineTest$EchoInputDoFn.class */
    private static class EchoInputDoFn extends DoFn<KV<String, String>, KV<String, String>> {
        private static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class);
        private SubProcessConfiguration configuration;
        private String binaryName;

        public EchoInputDoFn(SubProcessConfiguration subProcessConfiguration, String str) {
            this.configuration = subProcessConfiguration;
            this.binaryName = str;
        }

        @DoFn.Setup
        public void setUp() throws Exception {
            CallingSubProcessUtils.setUp(this.configuration, this.binaryName);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, String>, KV<String, String>>.ProcessContext processContext) throws Exception {
            try {
                SubProcessCommandLineArgs subProcessCommandLineArgs = new SubProcessCommandLineArgs();
                subProcessCommandLineArgs.putCommand(new SubProcessCommandLineArgs.Command(0, String.valueOf(((KV) processContext.element()).getValue())));
                Iterator it = new SubProcessKernel(this.configuration, this.binaryName).exec(subProcessCommandLineArgs).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of((String) ((KV) processContext.element()).getKey(), (String) it.next()));
                }
            } catch (Exception e) {
                LOG.error("Error processing element ", e);
                throw e;
            }
        }
    }

    @Test
    public void testExampleEchoPipeline() throws Exception {
        Path createTempFile = Files.createTempFile("test-Echo", ".sh", new FileAttribute[0]);
        Path createTempFile2 = Files.createTempFile("test-EchoAgain", ".sh", new FileAttribute[0]);
        Path createTempDirectory = Files.createTempDirectory("test-Echoo", new FileAttribute[0]);
        FileChannel open = FileChannel.open(createTempFile, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        try {
            open.write(ByteBuffer.wrap(getTestShellEcho().getBytes(StandardCharsets.UTF_8)));
            if (open != null) {
                open.close();
            }
            open = FileChannel.open(createTempFile2, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
            try {
                open.write(ByteBuffer.wrap(getTestShellEchoAgain().getBytes(StandardCharsets.UTF_8)));
                if (open != null) {
                    open.close();
                }
                SubProcessPipelineOptions as = PipelineOptionsFactory.as(SubProcessPipelineOptions.class);
                as.setConcurrency(2);
                as.setSourcePath(createTempFile.getParent().toString());
                as.setWorkerPath(createTempDirectory.toAbsolutePath().toString());
                this.p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
                SubProcessConfiguration subProcessConfiguration = as.getSubProcessConfiguration();
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < 100; i++) {
                    String valueOf = String.valueOf(i);
                    arrayList.add(KV.of(valueOf, valueOf));
                }
                PAssert.that(this.p.apply(Create.of(arrayList)).apply("Echo inputs round 1", ParDo.of(new EchoInputDoFn(subProcessConfiguration, createTempFile.getFileName().toString()))).apply("Echo inputs round 2", ParDo.of(new EchoInputDoFn(subProcessConfiguration, createTempFile2.getFileName().toString())))).containsInAnyOrder(arrayList);
                this.p.run();
            } finally {
            }
        } finally {
        }
    }

    private static String getTestShellEcho() {
        return "#!/bin/sh\nfilename=$1;\necho $2 >> $filename;";
    }

    private static String getTestShellEchoAgain() {
        return "#!/bin/sh\nfilename=$1;\necho $2 >> $filename;";
    }

    private GcsUtil buildMockGcsUtil() throws IOException {
        GcsUtil gcsUtil = (GcsUtil) Mockito.mock(GcsUtil.class);
        Mockito.when(gcsUtil.open((GcsPath) Mockito.any(GcsPath.class))).then(new Answer<SeekableByteChannel>() { // from class: org.apache.beam.examples.subprocess.ExampleEchoPipelineTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public SeekableByteChannel m20answer(InvocationOnMock invocationOnMock) throws Throwable {
                return FileChannel.open(Files.createTempFile("channel-", ".tmp", new FileAttribute[0]), StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
            }
        });
        Mockito.when(gcsUtil.expand((GcsPath) Mockito.any(GcsPath.class))).then(new Answer<List<GcsPath>>() { // from class: org.apache.beam.examples.subprocess.ExampleEchoPipelineTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public List<GcsPath> m21answer(InvocationOnMock invocationOnMock) throws Throwable {
                return ImmutableList.of((GcsPath) invocationOnMock.getArguments()[0]);
            }
        });
        return gcsUtil;
    }
}
