/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Charsets;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.MoreObjects;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Optional;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Lists;
import org.apache.beam.repackaged.beam_sdks_java_core.org.apache.commons.compress.utils.Sets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.SimpleSink;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.ReadableInstant;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class WriteFilesTest {
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    @Rule
    public final TestPipeline p = TestPipeline.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP = MapElements.via((SimpleFunction)new SimpleFunction<String, String>(){

        public String apply(String input) {
            return input;
        }
    });
    private static final PTransform<PCollection<String>, PCollectionView<Integer>> SHARDING_TRANSFORM = new PTransform<PCollection<String>, PCollectionView<Integer>>(){

        public PCollectionView<Integer> expand(PCollection<String> input) {
            return null;
        }
    };

    private String getBaseOutputFilename() {
        return this.getBaseOutputDirectory().resolve("file", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWrite() throws IOException {
        List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch");
        this.runWrite(inputs, IDENTITY_MAP, this.getBaseOutputFilename(), WriteFiles.to(this.makeSimpleSink()));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testEmptyWrite() throws IOException {
        this.runWrite(Collections.emptyList(), IDENTITY_MAP, this.getBaseOutputFilename(), WriteFiles.to(this.makeSimpleSink()));
        WriteFilesTest.checkFileContents(this.getBaseOutputFilename(), Collections.emptyList(), Optional.of(1), true);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testShardedWrite() throws IOException {
        this.runShardedWrite(Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, this.getBaseOutputFilename(), WriteFiles.to(this.makeSimpleSink()));
    }

    private ResourceId getBaseOutputDirectory() {
        return LocalResources.fromFile((File)this.tmpFolder.getRoot(), (boolean)true).resolve("output", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    private SimpleSink<Void> makeSimpleSink() {
        PerWindowFiles filenamePolicy = new PerWindowFiles(this.getBaseOutputDirectory().resolve("file", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE), "simple");
        return SimpleSink.makeSimpleSink(this.getBaseOutputDirectory(), filenamePolicy);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testCustomShardedWrite() throws IOException {
        WriteOptions options = (WriteOptions)TestPipeline.testingPipelineOptions().as(WriteOptions.class);
        options.setTestFlag("test_value");
        Pipeline p = TestPipeline.create((PipelineOptions)options);
        ArrayList<String> inputs = new ArrayList<String>();
        ArrayList<Long> timestamps = new ArrayList<Long>();
        for (long i = 0L; i < 1000L; ++i) {
            inputs.add(Integer.toString(3));
            timestamps.add(i + 1L);
        }
        SimpleSink<Void> sink = this.makeSimpleSink();
        WriteFiles write = WriteFiles.to(sink).withSharding((PTransform)new LargestInt());
        ((WriteFilesResult)((PCollection)((PCollection)p.apply((PTransform)Create.timestamped(inputs, timestamps).withCoder((Coder)StringUtf8Coder.of()))).apply(IDENTITY_MAP)).apply((PTransform)write)).getPerDestinationOutputFilenames().apply(new VerifyFilesExist());
        p.run();
        WriteFilesTest.checkFileContents(this.getBaseOutputFilename(), inputs, Optional.of(3), true);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testExpandShardedWrite() throws IOException {
        this.runShardedWrite(Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, this.getBaseOutputFilename(), WriteFiles.to(this.makeSimpleSink()).withNumShards(20));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteWithEmptyPCollection() throws IOException {
        ArrayList<String> inputs = new ArrayList<String>();
        this.runWrite(inputs, IDENTITY_MAP, this.getBaseOutputFilename(), WriteFiles.to(this.makeSimpleSink()));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteWindowed() throws IOException {
        List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch");
        this.runWrite(inputs, new WindowAndReshuffle<String>(Window.into((WindowFn)FixedWindows.of((Duration)Duration.millis((long)2L)))), this.getBaseOutputFilename(), WriteFiles.to(this.makeSimpleSink()));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteWithSessions() throws IOException {
        List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch");
        this.runWrite(inputs, new WindowAndReshuffle<String>(Window.into((WindowFn)Sessions.withGapDuration((Duration)Duration.millis((long)1L)))), this.getBaseOutputFilename(), WriteFiles.to(this.makeSimpleSink()));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteSpilling() throws IOException {
        ArrayList<String> inputs = Lists.newArrayList();
        for (int i = 0; i < 100; ++i) {
            inputs.add("mambo_number_" + i);
        }
        this.runWrite(inputs, (PTransform<PCollection<String>, PCollection<String>>)Window.into((WindowFn)FixedWindows.of((Duration)Duration.millis((long)2L))), this.getBaseOutputFilename(), WriteFiles.to(this.makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites().withNumShards(1));
    }

    @Test
    public void testBuildWrite() {
        SimpleSink<Void> sink = this.makeSimpleSink();
        WriteFiles write = WriteFiles.to(sink).withNumShards(3);
        Assert.assertThat((Object)((Object)((SimpleSink)write.getSink())), (org.hamcrest.Matcher)Matchers.is(sink));
        PTransform originalSharding = write.getComputeNumShards();
        Assert.assertThat((Object)write.getComputeNumShards(), (org.hamcrest.Matcher)Matchers.is((org.hamcrest.Matcher)Matchers.nullValue()));
        Assert.assertThat((Object)write.getNumShardsProvider(), (org.hamcrest.Matcher)Matchers.instanceOf(ValueProvider.StaticValueProvider.class));
        Assert.assertThat((Object)((Integer)write.getNumShardsProvider().get()), (org.hamcrest.Matcher)Matchers.equalTo((Object)3));
        Assert.assertThat((Object)write.getComputeNumShards(), (org.hamcrest.Matcher)Matchers.equalTo((Object)originalSharding));
        WriteFiles write2 = write.withSharding(SHARDING_TRANSFORM);
        Assert.assertThat((Object)((Object)((SimpleSink)write2.getSink())), (org.hamcrest.Matcher)Matchers.is(sink));
        Assert.assertThat((Object)write2.getComputeNumShards(), (org.hamcrest.Matcher)Matchers.equalTo(SHARDING_TRANSFORM));
        WriteFiles writeUnsharded = write2.withRunnerDeterminedSharding();
        Assert.assertThat((Object)writeUnsharded.getComputeNumShards(), (org.hamcrest.Matcher)Matchers.nullValue());
        Assert.assertThat((Object)write.getComputeNumShards(), (org.hamcrest.Matcher)Matchers.equalTo((Object)originalSharding));
    }

    @Test
    public void testDisplayData() {
        FileBasedSink.DynamicDestinations dynamicDestinations = DynamicFileDestinations.constant((FileBasedSink.FilenamePolicy)DefaultFilenamePolicy.fromParams((DefaultFilenamePolicy.Params)new DefaultFilenamePolicy.Params().withBaseFilename(this.getBaseOutputDirectory().resolve("file", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE)).withShardTemplate("-SS-of-NN")));
        SimpleSink<Void> sink = new SimpleSink<Void>(this.getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED){

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item((String)"foo", (String)"bar"));
            }
        };
        WriteFiles write = WriteFiles.to((FileBasedSink)sink);
        DisplayData displayData = DisplayData.from((HasDisplayData)write);
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("sink", ((Object)sink).getClass()));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.includesDisplayDataFor("sink", (HasDisplayData)sink));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testUnboundedNeedsWindowed() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Must use windowed writes when applying WriteFiles to an unbounded PCollection");
        SimpleSink<Void> sink = this.makeSimpleSink();
        ((PCollection)this.p.apply((PTransform)Create.of((Object)"foo", (Object[])new String[0]))).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED).apply((PTransform)WriteFiles.to(sink));
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testUnboundedWritesNeedSharding() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("When applying WriteFiles to an unbounded PCollection, must specify number of output shards explicitly");
        SimpleSink<Void> sink = this.makeSimpleSink();
        ((PCollection)this.p.apply((PTransform)Create.of((Object)"foo", (Object[])new String[0]))).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED).apply((PTransform)WriteFiles.to(sink).withWindowedWrites());
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDynamicDestinationsBounded() throws Exception {
        this.testDynamicDestinationsHelper(true, false);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDynamicDestinationsUnbounded() throws Exception {
        this.testDynamicDestinationsHelper(false, false);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDynamicDestinationsFillEmptyShards() throws Exception {
        this.testDynamicDestinationsHelper(true, true);
    }

    private void testDynamicDestinationsHelper(boolean bounded, boolean emptyShards) throws IOException {
        WriteFilesResult res;
        TestDestinations dynamicDestinations = new TestDestinations(this.getBaseOutputDirectory());
        SimpleSink<Integer> sink = new SimpleSink<Integer>(this.getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED);
        WriteOptions options = (WriteOptions)TestPipeline.testingPipelineOptions().as(WriteOptions.class);
        options.setTestFlag("test_value");
        Pipeline p = TestPipeline.create((PipelineOptions)options);
        int numInputs = 100;
        ArrayList<String> inputs = Lists.newArrayList();
        for (int i = 0; i < 100; ++i) {
            inputs.add(Integer.toString(i));
        }
        ArrayList<Long> timestamps = new ArrayList<Long>();
        for (long i = 0L; i < (long)inputs.size(); ++i) {
            timestamps.add(i + 1L);
        }
        int numShards = emptyShards ? 40 : 2;
        WriteFiles writeFiles = WriteFiles.to(sink).withNumShards(numShards);
        PCollection input = (PCollection)p.apply((PTransform)Create.timestamped(inputs, timestamps));
        if (!bounded) {
            input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
            input = (PCollection)input.apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardDays((long)1L))));
            res = (WriteFilesResult)input.apply((PTransform)writeFiles.withWindowedWrites());
        } else {
            res = (WriteFilesResult)input.apply((PTransform)writeFiles);
        }
        res.getPerDestinationOutputFilenames().apply(new VerifyFilesExist());
        p.run();
        for (int i = 0; i < 5; ++i) {
            ResourceId base = this.getBaseOutputDirectory().resolve("file_" + i, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
            ArrayList<String> expected = Lists.newArrayList();
            for (int j = i; j < 100; j += 5) {
                expected.add("record_" + j);
            }
            WriteFilesTest.checkFileContents(base.toString(), expected, Optional.of(numShards), bounded);
        }
    }

    @Test
    public void testShardedDisplayData() {
        FileBasedSink.DynamicDestinations dynamicDestinations = DynamicFileDestinations.constant((FileBasedSink.FilenamePolicy)DefaultFilenamePolicy.fromParams((DefaultFilenamePolicy.Params)new DefaultFilenamePolicy.Params().withBaseFilename(this.getBaseOutputDirectory().resolve("file", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE)).withShardTemplate("-SS-of-NN")));
        SimpleSink<Void> sink = new SimpleSink<Void>(this.getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED){

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item((String)"foo", (String)"bar"));
            }
        };
        WriteFiles write = WriteFiles.to((FileBasedSink)sink).withNumShards(1);
        DisplayData displayData = DisplayData.from((HasDisplayData)write);
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("sink", ((Object)sink).getClass()));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.includesDisplayDataFor("sink", (HasDisplayData)sink));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("numShards", 1L));
    }

    @Test
    public void testCustomShardStrategyDisplayData() {
        FileBasedSink.DynamicDestinations dynamicDestinations = DynamicFileDestinations.constant((FileBasedSink.FilenamePolicy)DefaultFilenamePolicy.fromParams((DefaultFilenamePolicy.Params)new DefaultFilenamePolicy.Params().withBaseFilename(this.getBaseOutputDirectory().resolve("file", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE)).withShardTemplate("-SS-of-NN")));
        SimpleSink<Void> sink = new SimpleSink<Void>(this.getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED){

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item((String)"foo", (String)"bar"));
            }
        };
        WriteFiles write = WriteFiles.to((FileBasedSink)sink).withSharding((PTransform)new PTransform<PCollection<String>, PCollectionView<Integer>>(){

            public PCollectionView<Integer> expand(PCollection<String> input) {
                return null;
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item((String)"spam", (String)"ham"));
            }
        });
        DisplayData displayData = DisplayData.from((HasDisplayData)write);
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("sink", ((Object)sink).getClass()));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.includesDisplayDataFor("sink", (HasDisplayData)sink));
        Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("spam", "ham"));
    }

    private void runWrite(List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, String baseName, WriteFiles<String, ?, String> write) throws IOException {
        this.runShardedWrite(inputs, transform, baseName, write);
    }

    private void runShardedWrite(List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, String baseName, WriteFiles<String, ?, String> write) throws IOException {
        WriteOptions options = (WriteOptions)TestPipeline.testingPipelineOptions().as(WriteOptions.class);
        options.setTestFlag("test_value");
        Pipeline p = TestPipeline.create((PipelineOptions)options);
        ArrayList<Long> timestamps = new ArrayList<Long>();
        for (long i = 0L; i < (long)inputs.size(); ++i) {
            timestamps.add(i + 1L);
        }
        ((WriteFilesResult)((PCollection)((PCollection)p.apply((PTransform)Create.timestamped(inputs, timestamps).withCoder((Coder)StringUtf8Coder.of()))).apply(transform)).apply(write)).getPerDestinationOutputFilenames().apply(new VerifyFilesExist());
        p.run();
        Optional<Integer> numShards = write.getNumShardsProvider() != null && !write.getWindowedWrites() ? Optional.of((Integer)write.getNumShardsProvider().get()) : Optional.absent();
        WriteFilesTest.checkFileContents(baseName, inputs, numShards, !write.getWindowedWrites());
    }

    static void checkFileContents(String baseName, List<String> inputs, Optional<Integer> numExpectedShards, boolean expectRemovedTempDirectory) throws IOException {
        ArrayList<File> outputFiles = Lists.newArrayList();
        String pattern = baseName + "*";
        List metadata = ((MatchResult)FileSystems.match(Collections.singletonList(pattern)).get(0)).metadata();
        for (MatchResult.Metadata meta : metadata) {
            outputFiles.add(new File(meta.resourceId().toString()));
        }
        Assert.assertFalse((String)"Should have produced at least 1 output file", (boolean)outputFiles.isEmpty());
        if (numExpectedShards.isPresent()) {
            Assert.assertEquals((long)numExpectedShards.get().intValue(), (long)outputFiles.size());
            Pattern shardPattern = Pattern.compile("\\d{4}-of-\\d{4}");
            HashSet<String> expectedShards = Sets.newHashSet(new String[0]);
            DecimalFormat df = new DecimalFormat("0000");
            for (int i = 0; i < numExpectedShards.get(); ++i) {
                expectedShards.add(String.format("%s-of-%s", df.format(i), df.format(numExpectedShards.get())));
            }
            HashSet<String> outputShards = Sets.newHashSet(new String[0]);
            for (File file : outputFiles) {
                Matcher matcher = shardPattern.matcher(file.getName());
                Assert.assertTrue((boolean)matcher.find());
                Assert.assertTrue((boolean)outputShards.add(matcher.group()));
            }
            Assert.assertEquals(expectedShards, outputShards);
        }
        ArrayList<String> actual = Lists.newArrayList();
        for (File outputFile : outputFiles) {
            BufferedReader reader = Files.newBufferedReader(outputFile.toPath(), Charsets.UTF_8);
            Object object = null;
            try {
                String line;
                while ((line = reader.readLine()) != null) {
                    if (line.equals("header") || line.equals("footer")) continue;
                    actual.add(line);
                }
            }
            catch (Throwable throwable) {
                object = throwable;
                throw throwable;
            }
            finally {
                if (reader == null) continue;
                if (object != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)object).addSuppressed(throwable);
                    }
                    continue;
                }
                reader.close();
            }
        }
        Assert.assertThat(actual, (org.hamcrest.Matcher)Matchers.containsInAnyOrder((Object[])inputs.toArray()));
        if (expectRemovedTempDirectory) {
            Assert.assertThat(Lists.newArrayList(new File(baseName).getParentFile().list()), (org.hamcrest.Matcher)Matchers.everyItem((org.hamcrest.Matcher)Matchers.not((org.hamcrest.Matcher)Matchers.containsString((String)".temp-beam"))));
        }
    }

    private static class LargestInt
    extends PTransform<PCollection<String>, PCollectionView<Integer>> {
        private LargestInt() {
        }

        public PCollectionView<Integer> expand(PCollection<String> input) {
            return (PCollectionView)((PCollection)((PCollection)((PCollection)input.apply((PTransform)ParDo.of((DoFn)new DoFn<String, Integer>(){

                @DoFn.ProcessElement
                public void toInteger(DoFn.ProcessContext ctxt) {
                    ctxt.output((Object)Integer.valueOf((String)ctxt.element()));
                }
            }))).apply((PTransform)Top.largest((int)1))).apply((PTransform)Flatten.iterables())).apply((PTransform)View.asSingleton());
        }
    }

    public static interface WriteOptions
    extends PipelineOptionsFactoryTest.TestPipelineOptions {
        @Description(value="Test flag and value")
        public String getTestFlag();

        public void setTestFlag(String var1);
    }

    private static class PerWindowFiles
    extends FileBasedSink.FilenamePolicy {
        private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecondMillis();
        private final ResourceId baseFilename;
        private final String suffix;

        public PerWindowFiles(ResourceId baseFilename, String suffix) {
            this.baseFilename = baseFilename;
            this.suffix = suffix;
        }

        public String filenamePrefixForWindow(IntervalWindow window) {
            String prefix = this.baseFilename.isDirectory() ? "" : MoreObjects.firstNonNull(this.baseFilename.getFilename(), "");
            return String.format("%s%s-%s", prefix, FORMATTER.print((ReadableInstant)window.start()), FORMATTER.print((ReadableInstant)window.end()));
        }

        public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints outputFileHints) {
            DecimalFormat df = new DecimalFormat("0000");
            IntervalWindow intervalWindow = (IntervalWindow)window;
            String filename = String.format("%s-%s-of-%s%s%s", this.filenamePrefixForWindow(intervalWindow), df.format(shardNumber), df.format(numShards), outputFileHints.getSuggestedFilenameSuffix(), this.suffix);
            return this.baseFilename.getCurrentDirectory().resolve(filename, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }

        public ResourceId unwindowedFilename(int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
            DecimalFormat df = new DecimalFormat("0000");
            String prefix = this.baseFilename.isDirectory() ? "" : MoreObjects.firstNonNull(this.baseFilename.getFilename(), "");
            String filename = String.format("%s-%s-of-%s%s%s", prefix, df.format(shardNumber), df.format(numShards), outputFileHints.getSuggestedFilenameSuffix(), this.suffix);
            return this.baseFilename.getCurrentDirectory().resolve(filename, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }
    }

    static class TestDestinations
    extends FileBasedSink.DynamicDestinations<String, Integer, String> {
        private ResourceId baseOutputDirectory;

        TestDestinations(ResourceId baseOutputDirectory) {
            this.baseOutputDirectory = baseOutputDirectory;
        }

        public String formatRecord(String record) {
            return "record_" + record;
        }

        public Integer getDestination(String element) {
            return Integer.valueOf(element) % 5;
        }

        public Integer getDefaultDestination() {
            return 0;
        }

        public FileBasedSink.FilenamePolicy getFilenamePolicy(Integer destination) {
            return new PerWindowFiles(this.baseOutputDirectory.resolve("file_" + destination, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE), "simple");
        }
    }

    private static class VerifyFilesExist<DestinationT>
    extends PTransform<PCollection<KV<DestinationT, String>>, PDone> {
        private VerifyFilesExist() {
        }

        public PDone expand(PCollection<KV<DestinationT, String>> input) {
            ((PCollection)input.apply((PTransform)Values.create())).apply((PTransform)FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW));
            return PDone.in((Pipeline)input.getPipeline());
        }
    }

    private static class WindowAndReshuffle<T>
    extends PTransform<PCollection<T>, PCollection<T>> {
        private final Window<T> window;

        public WindowAndReshuffle(Window<T> window) {
            this.window = window;
        }

        public PCollection<T> expand(PCollection<T> input) {
            return (PCollection)((PCollection)((PCollection)((PCollection)input.apply(this.window)).apply((PTransform)ParDo.of(new AddArbitraryKey()))).apply((PTransform)GroupByKey.create())).apply((PTransform)ParDo.of(new RemoveArbitraryKey()));
        }

        private static class RemoveArbitraryKey<T>
        extends DoFn<KV<Integer, Iterable<T>>, T> {
            private RemoveArbitraryKey() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                for (Object s : (Iterable)((KV)c.element()).getValue()) {
                    c.output(s);
                }
            }
        }

        private static class AddArbitraryKey<T>
        extends DoFn<T, KV<Integer, T>> {
            private AddArbitraryKey() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                c.output((Object)KV.of((Object)ThreadLocalRandom.current().nextInt(), (Object)c.element()));
            }
        }
    }
}

