package org.apache.beam.sdk;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.UUID;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.FileChecksumMatcher;
import org.apache.beam.sdk.testing.SerializableMatchers;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.FilePatternMatchingShardedFile;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/RequiresStableInputIT.class */
public class RequiresStableInputIT {
    private static final String VALUE = "value";
    private static final String VALUE_CHECKSUM = "f32b67c7e26342af42efabc674d441dca0a281c5";

    /* loaded from: input_file:org/apache/beam/sdk/RequiresStableInputIT$MakeSideEffectAndThenFailFn.class */
    private static class MakeSideEffectAndThenFailFn extends DoFn<KV<String, String>, String> {
        private final String outputPrefix;

        private MakeSideEffectAndThenFailFn(String str) {
            this.outputPrefix = str;
        }

        @DoFn.ProcessElement
        @DoFn.RequiresStableInput
        public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext) throws Exception {
            boolean z = FileSystems.match(new StringBuilder().append(this.outputPrefix).append("*").toString()).metadata().size() == 0;
            KV<String, String> element = processContext.element();
            writeTextToFileSideEffect(element.getValue(), this.outputPrefix + element.getKey());
            if (z) {
                throw new Exception("Deliberate failure: should happen only once for each application of the DoFnwithin the transform graph.");
            }
        }

        private static void writeTextToFileSideEffect(String str, String str2) throws IOException {
            WritableByteChannel create = FileSystems.create(FileSystems.matchNewResource(str2, false), "text/plain");
            create.write(ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)));
            create.close();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/RequiresStableInputIT$PairWithRandomKeyFn.class */
    private static class PairWithRandomKeyFn extends SimpleFunction<String, KV<String, String>> {
        private PairWithRandomKeyFn() {
        }

        @Override // org.apache.beam.sdk.transforms.SimpleFunction, org.apache.beam.sdk.transforms.SerializableFunction
        public KV<String, String> apply(String str) {
            return KV.of(UUID.randomUUID().toString(), str);
        }
    }

    @BeforeClass
    public static void setup() {
        PipelineOptionsFactory.register(TestPipelineOptions.class);
    }

    @Test
    public void testParDoRequiresStableInput() {
        TestPipelineOptions testPipelineOptions = (TestPipelineOptions) TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
        ResourceId resolve = FileSystems.matchNewResource(testPipelineOptions.getTempRoot(), true).resolve(String.format("requires-stable-input-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
        String resourceId = resolve.resolve("pardo-single-output", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("key-", ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
        String resourceId2 = resolve.resolve("pardo-multi-output", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("key-", ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
        testPipelineOptions.setOnSuccessMatcher(SerializableMatchers.allOf(new FileChecksumMatcher(VALUE_CHECKSUM, new FilePatternMatchingShardedFile(resourceId + "*")), new FileChecksumMatcher(VALUE_CHECKSUM, new FilePatternMatchingShardedFile(resourceId2 + "*"))));
        Pipeline create = Pipeline.create(testPipelineOptions);
        PCollection pCollection = (PCollection) create.apply("CreatePCollectionOfOneValue", Create.of(VALUE, new String[0]));
        ((PCollection) pCollection.apply("Single-PairWithRandomKey", MapElements.via((SimpleFunction) new PairWithRandomKeyFn()))).apply("Single-MakeSideEffectAndThenFail", ParDo.of(new MakeSideEffectAndThenFailFn(resourceId)));
        ((PCollection) pCollection.apply("Multi-PairWithRandomKey", MapElements.via((SimpleFunction) new PairWithRandomKeyFn()))).apply("Multi-MakeSideEffectAndThenFail", ParDo.of(new MakeSideEffectAndThenFailFn(resourceId2)).withOutputTags(new TupleTag(), TupleTagList.empty()));
        create.run().waitUntilFinish();
    }
}
