package org.apache.beam.sdk.io;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Charsets;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.MoreObjects;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Optional;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Lists;
import org.apache.beam.repackaged.beam_sdks_java_core.org.apache.commons.compress.utils.Sets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDoTest;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest.class */
public class WriteFilesTest {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Rule
    public final TestPipeline p = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP = MapElements.via(new SimpleFunction<String, String>() { // from class: org.apache.beam.sdk.io.WriteFilesTest.1
        public String apply(String str) {
            return str;
        }
    });
    private static final PTransform<PCollection<String>, PCollectionView<Integer>> SHARDING_TRANSFORM = new PTransform<PCollection<String>, PCollectionView<Integer>>() { // from class: org.apache.beam.sdk.io.WriteFilesTest.2
        public PCollectionView<Integer> expand(PCollection<String> pCollection) {
            return null;
        }
    };

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$LargestInt.class */
    private static class LargestInt extends PTransform<PCollection<String>, PCollectionView<Integer>> {
        private LargestInt() {
        }

        public PCollectionView<Integer> expand(PCollection<String> pCollection) {
            return pCollection.apply(ParDo.of(new DoFn<String, Integer>() { // from class: org.apache.beam.sdk.io.WriteFilesTest.LargestInt.1
                @DoFn.ProcessElement
                public void toInteger(DoFn<String, Integer>.ProcessContext processContext) {
                    processContext.output(Integer.valueOf((String) processContext.element()));
                }
            })).apply(Top.largest(1)).apply(Flatten.iterables()).apply(View.asSingleton());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$PerWindowFiles.class */
    public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
        private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecondMillis();
        private final ResourceId baseFilename;
        private final String suffix;

        public PerWindowFiles(ResourceId resourceId, String str) {
            this.baseFilename = resourceId;
            this.suffix = str;
        }

        public String filenamePrefixForWindow(IntervalWindow intervalWindow) {
            return String.format("%s%s-%s", this.baseFilename.isDirectory() ? "" : (String) MoreObjects.firstNonNull(this.baseFilename.getFilename(), ""), FORMATTER.print(intervalWindow.start()), FORMATTER.print(intervalWindow.end()));
        }

        public ResourceId windowedFilename(int i, int i2, BoundedWindow boundedWindow, PaneInfo paneInfo, FileBasedSink.OutputFileHints outputFileHints) {
            DecimalFormat decimalFormat = new DecimalFormat("0000");
            return this.baseFilename.getCurrentDirectory().resolve(String.format("%s-%s-of-%s%s%s", filenamePrefixForWindow((IntervalWindow) boundedWindow), decimalFormat.format(i), decimalFormat.format(i2), outputFileHints.getSuggestedFilenameSuffix(), this.suffix), ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }

        public ResourceId unwindowedFilename(int i, int i2, FileBasedSink.OutputFileHints outputFileHints) {
            DecimalFormat decimalFormat = new DecimalFormat("0000");
            return this.baseFilename.getCurrentDirectory().resolve(String.format("%s-%s-of-%s%s%s", this.baseFilename.isDirectory() ? "" : (String) MoreObjects.firstNonNull(this.baseFilename.getFilename(), ""), decimalFormat.format(i), decimalFormat.format(i2), outputFileHints.getSuggestedFilenameSuffix(), this.suffix), ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$TestDestinations.class */
    public static class TestDestinations extends FileBasedSink.DynamicDestinations<String, Integer, String> {
        private ResourceId baseOutputDirectory;

        TestDestinations(ResourceId resourceId) {
            this.baseOutputDirectory = resourceId;
        }

        public String formatRecord(String str) {
            return "record_" + str;
        }

        public Integer getDestination(String str) {
            return Integer.valueOf(Integer.valueOf(str).intValue() % 5);
        }

        /* renamed from: getDefaultDestination, reason: merged with bridge method [inline-methods] */
        public Integer m1069getDefaultDestination() {
            return 0;
        }

        public FileBasedSink.FilenamePolicy getFilenamePolicy(Integer num) {
            return new PerWindowFiles(this.baseOutputDirectory.resolve("file_" + num, ResolveOptions.StandardResolveOptions.RESOLVE_FILE), "simple");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$VerifyFilesExist.class */
    public static class VerifyFilesExist<DestinationT> extends PTransform<PCollection<KV<DestinationT, String>>, PDone> {
        private VerifyFilesExist() {
        }

        public PDone expand(PCollection<KV<DestinationT, String>> pCollection) {
            pCollection.apply(Values.create()).apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW));
            return PDone.in(pCollection.getPipeline());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$WindowAndReshuffle.class */
    private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
        private final Window<T> window;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$WindowAndReshuffle$AddArbitraryKey.class */
        public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
            private AddArbitraryKey() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<T, KV<Integer, T>>.ProcessContext processContext) {
                processContext.output(KV.of(Integer.valueOf(ThreadLocalRandom.current().nextInt()), processContext.element()));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$WindowAndReshuffle$RemoveArbitraryKey.class */
        public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
            private RemoveArbitraryKey() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<Integer, Iterable<T>>, T>.ProcessContext processContext) {
                Iterator<T> it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                while (it.hasNext()) {
                    processContext.output(it.next());
                }
            }
        }

        public WindowAndReshuffle(Window<T> window) {
            this.window = window;
        }

        public PCollection<T> expand(PCollection<T> pCollection) {
            return pCollection.apply(this.window).apply(ParDo.of(new AddArbitraryKey())).apply(GroupByKey.create()).apply(ParDo.of(new RemoveArbitraryKey()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$WriteOptions.class */
    public interface WriteOptions extends PipelineOptionsFactoryTest.TestPipelineOptions {
        @Description("Test flag and value")
        String getTestFlag();

        void setTestFlag(String str);
    }

    private String getBaseOutputFilename() {
        return getBaseOutputDirectory().resolve("file", ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWrite() throws IOException {
        runWrite(Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"), IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testEmptyWrite() throws IOException {
        runWrite(Collections.emptyList(), IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
        checkFileContents(getBaseOutputFilename(), Collections.emptyList(), Optional.of(1), true);
    }

    @Test
    @Category({NeedsRunner.class})
    public void testShardedWrite() throws IOException {
        runShardedWrite(Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
    }

    private ResourceId getBaseOutputDirectory() {
        return LocalResources.fromFile(this.tmpFolder.getRoot(), true).resolve("output", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    private SimpleSink<Void> makeSimpleSink() {
        return SimpleSink.makeSimpleSink(getBaseOutputDirectory(), new PerWindowFiles(getBaseOutputDirectory().resolve("file", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), "simple"));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testCustomShardedWrite() throws IOException {
        WriteOptions writeOptions = (WriteOptions) TestPipeline.testingPipelineOptions().as(WriteOptions.class);
        writeOptions.setTestFlag("test_value");
        Pipeline create = TestPipeline.create(writeOptions);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 1000) {
                create.apply(Create.timestamped(arrayList, arrayList2).withCoder(StringUtf8Coder.of())).apply(IDENTITY_MAP).apply(WriteFiles.to(makeSimpleSink()).withSharding(new LargestInt())).getPerDestinationOutputFilenames().apply(new VerifyFilesExist());
                create.run();
                checkFileContents(getBaseOutputFilename(), arrayList, Optional.of(3), true);
                return;
            }
            arrayList.add(Integer.toString(3));
            arrayList2.add(Long.valueOf(j2 + 1));
            j = j2 + 1;
        }
    }

    @Test
    @Category({NeedsRunner.class})
    public void testExpandShardedWrite() throws IOException {
        runShardedWrite(Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()).withNumShards(20));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteWithEmptyPCollection() throws IOException {
        runWrite(new ArrayList(), IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteWindowed() throws IOException {
        runWrite(Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"), new WindowAndReshuffle(Window.into(FixedWindows.of(Duration.millis(2L)))), getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteWithSessions() throws IOException {
        runWrite(Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"), new WindowAndReshuffle(Window.into(Sessions.withGapDuration(Duration.millis(1L)))), getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteSpilling() throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 100; i++) {
            newArrayList.add("mambo_number_" + i);
        }
        runWrite(newArrayList, Window.into(FixedWindows.of(Duration.millis(2L))), getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites().withNumShards(1));
    }

    @Test
    public void testBuildWrite() {
        SimpleSink<Void> makeSimpleSink = makeSimpleSink();
        WriteFiles withNumShards = WriteFiles.to(makeSimpleSink).withNumShards(3);
        Assert.assertThat((SimpleSink) withNumShards.getSink(), Matchers.is(makeSimpleSink));
        PTransform computeNumShards = withNumShards.getComputeNumShards();
        Assert.assertThat(withNumShards.getComputeNumShards(), Matchers.is(Matchers.nullValue()));
        Assert.assertThat(withNumShards.getNumShardsProvider(), Matchers.instanceOf(ValueProvider.StaticValueProvider.class));
        Assert.assertThat((Integer) withNumShards.getNumShardsProvider().get(), Matchers.equalTo(3));
        Assert.assertThat(withNumShards.getComputeNumShards(), Matchers.equalTo(computeNumShards));
        WriteFiles withSharding = withNumShards.withSharding(SHARDING_TRANSFORM);
        Assert.assertThat((SimpleSink) withSharding.getSink(), Matchers.is(makeSimpleSink));
        Assert.assertThat(withSharding.getComputeNumShards(), Matchers.equalTo(SHARDING_TRANSFORM));
        Assert.assertThat(withSharding.withRunnerDeterminedSharding().getComputeNumShards(), Matchers.nullValue());
        Assert.assertThat(withNumShards.getComputeNumShards(), Matchers.equalTo(computeNumShards));
    }

    @Test
    public void testDisplayData() {
        SimpleSink<Void> simpleSink = new SimpleSink<Void>(getBaseOutputDirectory(), DynamicFileDestinations.constant(DefaultFilenamePolicy.fromParams(new DefaultFilenamePolicy.Params().withBaseFilename(getBaseOutputDirectory().resolve("file", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)).withShardTemplate("-SS-of-NN"))), Compression.UNCOMPRESSED) { // from class: org.apache.beam.sdk.io.WriteFilesTest.3
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, "bar"));
            }
        };
        DisplayData from = DisplayData.from(WriteFiles.to(simpleSink));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("sink", simpleSink.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFor("sink", simpleSink));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedNeedsWindowed() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Must use windowed writes when applying WriteFiles to an unbounded PCollection");
        this.p.apply(Create.of(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, new String[0])).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED).apply(WriteFiles.to(makeSimpleSink()));
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedWritesNeedSharding() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("When applying WriteFiles to an unbounded PCollection, must specify number of output shards explicitly");
        this.p.apply(Create.of(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, new String[0])).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED).apply(WriteFiles.to(makeSimpleSink()).withWindowedWrites());
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testDynamicDestinationsBounded() throws Exception {
        testDynamicDestinationsHelper(true, false);
    }

    @Test
    @Category({NeedsRunner.class})
    public void testDynamicDestinationsUnbounded() throws Exception {
        testDynamicDestinationsHelper(false, false);
    }

    @Test
    @Category({NeedsRunner.class})
    public void testDynamicDestinationsFillEmptyShards() throws Exception {
        testDynamicDestinationsHelper(true, true);
    }

    private void testDynamicDestinationsHelper(boolean z, boolean z2) throws IOException {
        WriteFilesResult apply;
        SimpleSink simpleSink = new SimpleSink(getBaseOutputDirectory(), new TestDestinations(getBaseOutputDirectory()), Compression.UNCOMPRESSED);
        WriteOptions writeOptions = (WriteOptions) TestPipeline.testingPipelineOptions().as(WriteOptions.class);
        writeOptions.setTestFlag("test_value");
        Pipeline create = TestPipeline.create(writeOptions);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 100; i++) {
            newArrayList.add(Integer.toString(i));
        }
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= newArrayList.size()) {
                break;
            }
            arrayList.add(Long.valueOf(j2 + 1));
            j = j2 + 1;
        }
        int i2 = z2 ? 40 : 2;
        WriteFiles withNumShards = WriteFiles.to(simpleSink).withNumShards(i2);
        PCollection apply2 = create.apply(Create.timestamped(newArrayList, arrayList));
        if (z) {
            apply = apply2.apply(withNumShards);
        } else {
            apply2.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
            apply = (WriteFilesResult) apply2.apply(Window.into(FixedWindows.of(Duration.standardDays(1L)))).apply(withNumShards.withWindowedWrites());
        }
        apply.getPerDestinationOutputFilenames().apply(new VerifyFilesExist());
        create.run();
        for (int i3 = 0; i3 < 5; i3++) {
            ResourceId resolve = getBaseOutputDirectory().resolve("file_" + i3, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
            ArrayList newArrayList2 = Lists.newArrayList();
            for (int i4 = i3; i4 < 100; i4 += 5) {
                newArrayList2.add("record_" + i4);
            }
            checkFileContents(resolve.toString(), newArrayList2, Optional.of(Integer.valueOf(i2)), z);
        }
    }

    @Test
    public void testShardedDisplayData() {
        SimpleSink<Void> simpleSink = new SimpleSink<Void>(getBaseOutputDirectory(), DynamicFileDestinations.constant(DefaultFilenamePolicy.fromParams(new DefaultFilenamePolicy.Params().withBaseFilename(getBaseOutputDirectory().resolve("file", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)).withShardTemplate("-SS-of-NN"))), Compression.UNCOMPRESSED) { // from class: org.apache.beam.sdk.io.WriteFilesTest.4
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, "bar"));
            }
        };
        DisplayData from = DisplayData.from(WriteFiles.to(simpleSink).withNumShards(1));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("sink", simpleSink.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFor("sink", simpleSink));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("numShards", 1L));
    }

    @Test
    public void testCustomShardStrategyDisplayData() {
        SimpleSink<Void> simpleSink = new SimpleSink<Void>(getBaseOutputDirectory(), DynamicFileDestinations.constant(DefaultFilenamePolicy.fromParams(new DefaultFilenamePolicy.Params().withBaseFilename(getBaseOutputDirectory().resolve("file", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)).withShardTemplate("-SS-of-NN"))), Compression.UNCOMPRESSED) { // from class: org.apache.beam.sdk.io.WriteFilesTest.5
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, "bar"));
            }
        };
        DisplayData from = DisplayData.from(WriteFiles.to(simpleSink).withSharding(new PTransform<PCollection<String>, PCollectionView<Integer>>() { // from class: org.apache.beam.sdk.io.WriteFilesTest.6
            public PCollectionView<Integer> expand(PCollection<String> pCollection) {
                return null;
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("spam", "ham"));
            }
        }));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("sink", simpleSink.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFor("sink", simpleSink));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("spam", "ham"));
    }

    private void runWrite(List<String> list, PTransform<PCollection<String>, PCollection<String>> pTransform, String str, WriteFiles<String, ?, String> writeFiles) throws IOException {
        runShardedWrite(list, pTransform, str, writeFiles);
    }

    private void runShardedWrite(List<String> list, PTransform<PCollection<String>, PCollection<String>> pTransform, String str, WriteFiles<String, ?, String> writeFiles) throws IOException {
        WriteOptions writeOptions = (WriteOptions) TestPipeline.testingPipelineOptions().as(WriteOptions.class);
        writeOptions.setTestFlag("test_value");
        Pipeline create = TestPipeline.create(writeOptions);
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= list.size()) {
                break;
            }
            arrayList.add(Long.valueOf(j2 + 1));
            j = j2 + 1;
        }
        create.apply(Create.timestamped(list, arrayList).withCoder(StringUtf8Coder.of())).apply(pTransform).apply(writeFiles).getPerDestinationOutputFilenames().apply(new VerifyFilesExist());
        create.run();
        checkFileContents(str, list, (writeFiles.getNumShardsProvider() == null || writeFiles.getWindowedWrites()) ? Optional.absent() : Optional.of((Integer) writeFiles.getNumShardsProvider().get()), !writeFiles.getWindowedWrites());
    }

    static void checkFileContents(String str, List<String> list, Optional<Integer> optional, boolean z) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = ((MatchResult) FileSystems.match(Collections.singletonList(str + "*")).get(0)).metadata().iterator();
        while (it.hasNext()) {
            newArrayList.add(new File(((MatchResult.Metadata) it.next()).resourceId().toString()));
        }
        Assert.assertFalse("Should have produced at least 1 output file", newArrayList.isEmpty());
        if (optional.isPresent()) {
            Assert.assertEquals(optional.get().intValue(), newArrayList.size());
            Pattern compile = Pattern.compile("\\d{4}-of-\\d{4}");
            HashSet newHashSet = Sets.newHashSet(new String[0]);
            DecimalFormat decimalFormat = new DecimalFormat("0000");
            for (int i = 0; i < optional.get().intValue(); i++) {
                newHashSet.add(String.format("%s-of-%s", decimalFormat.format(i), decimalFormat.format(optional.get())));
            }
            HashSet newHashSet2 = Sets.newHashSet(new String[0]);
            Iterator it2 = newArrayList.iterator();
            while (it2.hasNext()) {
                Matcher matcher = compile.matcher(((File) it2.next()).getName());
                Assert.assertTrue(matcher.find());
                Assert.assertTrue(newHashSet2.add(matcher.group()));
            }
            Assert.assertEquals(newHashSet, newHashSet2);
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator it3 = newArrayList.iterator();
        while (it3.hasNext()) {
            BufferedReader newBufferedReader = Files.newBufferedReader(((File) it3.next()).toPath(), Charsets.UTF_8);
            Throwable th = null;
            while (true) {
                try {
                    try {
                        String readLine = newBufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        } else if (!readLine.equals("header") && !readLine.equals("footer")) {
                            newArrayList2.add(readLine);
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (newBufferedReader != null) {
                        if (th != null) {
                            try {
                                newBufferedReader.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            newBufferedReader.close();
                        }
                    }
                    throw th3;
                }
            }
            if (newBufferedReader != null) {
                if (0 != 0) {
                    try {
                        newBufferedReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newBufferedReader.close();
                }
            }
        }
        Assert.assertThat(newArrayList2, Matchers.containsInAnyOrder(list.toArray()));
        if (z) {
            Assert.assertThat(Lists.newArrayList(new File(str).getParentFile().list()), Matchers.everyItem(Matchers.not(Matchers.containsString(".temp-beam"))));
        }
    }
}
