package org.apache.beam.sdk.io;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.repackaged.com.google.common.base.Optional;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest.class */
public class WriteFilesTest {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Rule
    public final TestPipeline p = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP = MapElements.via(new SimpleFunction<String, String>() { // from class: org.apache.beam.sdk.io.WriteFilesTest.1
        public String apply(String str) {
            return str;
        }
    });
    private static final PTransform<PCollection<String>, PCollectionView<Integer>> SHARDING_TRANSFORM = new PTransform<PCollection<String>, PCollectionView<Integer>>() { // from class: org.apache.beam.sdk.io.WriteFilesTest.2
        public PCollectionView<Integer> expand(PCollection<String> pCollection) {
            return null;
        }
    };

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$LargestInt.class */
    private static class LargestInt extends PTransform<PCollection<String>, PCollectionView<Integer>> {
        private LargestInt() {
        }

        public PCollectionView<Integer> expand(PCollection<String> pCollection) {
            return pCollection.apply(ParDo.of(new DoFn<String, Integer>() { // from class: org.apache.beam.sdk.io.WriteFilesTest.LargestInt.1
                @DoFn.ProcessElement
                public void toInteger(DoFn<String, Integer>.ProcessContext processContext) {
                    processContext.output(Integer.valueOf((String) processContext.element()));
                }
            })).apply(Top.largest(1)).apply(Flatten.iterables()).apply(View.asSingleton());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$WindowAndReshuffle.class */
    private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
        private final Window<T> window;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$WindowAndReshuffle$AddArbitraryKey.class */
        public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
            private AddArbitraryKey() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<T, KV<Integer, T>>.ProcessContext processContext) {
                processContext.output(KV.of(Integer.valueOf(ThreadLocalRandom.current().nextInt()), processContext.element()));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$WindowAndReshuffle$RemoveArbitraryKey.class */
        public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
            private RemoveArbitraryKey() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<Integer, Iterable<T>>, T>.ProcessContext processContext) {
                Iterator<T> it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                while (it.hasNext()) {
                    processContext.output(it.next());
                }
            }
        }

        public WindowAndReshuffle(Window<T> window) {
            this.window = window;
        }

        public PCollection<T> expand(PCollection<T> pCollection) {
            return pCollection.apply(this.window).apply(ParDo.of(new AddArbitraryKey())).apply(GroupByKey.create()).apply(ParDo.of(new RemoveArbitraryKey()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFilesTest$WriteOptions.class */
    public interface WriteOptions extends PipelineOptionsFactoryTest.TestPipelineOptions {
        @Description("Test flag and value")
        String getTestFlag();

        void setTestFlag(String str);
    }

    private String appendToTempFolder(String str) {
        return Paths.get(this.tmpFolder.getRoot().getPath(), str).toString();
    }

    private String getBaseOutputFilename() {
        return getBaseOutputDirectory().resolve("file", ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWrite() throws IOException {
        runWrite(Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"), IDENTITY_MAP, getBaseOutputFilename());
    }

    @Test
    @Category({NeedsRunner.class})
    public void testEmptyWrite() throws IOException {
        runWrite(Collections.emptyList(), IDENTITY_MAP, getBaseOutputFilename());
        checkFileContents(getBaseOutputFilename(), Collections.emptyList(), Optional.of(1));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testShardedWrite() throws IOException {
        runShardedWrite(Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), Optional.of(1));
    }

    private ResourceId getBaseOutputDirectory() {
        return LocalResources.fromFile(this.tmpFolder.getRoot(), true).resolve("output", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    private SimpleSink makeSimpleSink() {
        return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple");
    }

    @Test
    @Category({NeedsRunner.class})
    public void testCustomShardedWrite() throws IOException {
        WriteOptions writeOptions = (WriteOptions) TestPipeline.testingPipelineOptions().as(WriteOptions.class);
        writeOptions.setTestFlag("test_value");
        Pipeline create = TestPipeline.create(writeOptions);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 1000) {
                create.apply(Create.timestamped(arrayList, arrayList2).withCoder(StringUtf8Coder.of())).apply(IDENTITY_MAP).apply(WriteFiles.to(makeSimpleSink()).withSharding(new LargestInt()));
                create.run();
                checkFileContents(getBaseOutputFilename(), arrayList, Optional.of(3));
                return;
            }
            arrayList.add(Integer.toString(3));
            arrayList2.add(Long.valueOf(j2 + 1));
            j = j2 + 1;
        }
    }

    @Test
    @Category({NeedsRunner.class})
    public void testExpandShardedWrite() throws IOException {
        runShardedWrite(Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), Optional.of(20));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteWithEmptyPCollection() throws IOException {
        runWrite(new ArrayList(), IDENTITY_MAP, getBaseOutputFilename());
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteWindowed() throws IOException {
        runWrite(Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"), new WindowAndReshuffle(Window.into(FixedWindows.of(Duration.millis(2L)))), getBaseOutputFilename());
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteWithSessions() throws IOException {
        runWrite(Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"), new WindowAndReshuffle(Window.into(Sessions.withGapDuration(Duration.millis(1L)))), getBaseOutputFilename());
    }

    @Test
    public void testBuildWrite() {
        SimpleSink makeSimpleSink = makeSimpleSink();
        WriteFiles withNumShards = WriteFiles.to(makeSimpleSink).withNumShards(3);
        Assert.assertThat((SimpleSink) withNumShards.getSink(), Matchers.is(makeSimpleSink));
        PTransform sharding = withNumShards.getSharding();
        Assert.assertThat(withNumShards.getSharding(), Matchers.is(Matchers.nullValue()));
        Assert.assertThat(withNumShards.getNumShards(), Matchers.instanceOf(ValueProvider.StaticValueProvider.class));
        Assert.assertThat(withNumShards.getNumShards().get(), Matchers.equalTo(3));
        Assert.assertThat(withNumShards.getSharding(), Matchers.equalTo(sharding));
        WriteFiles withSharding = withNumShards.withSharding(SHARDING_TRANSFORM);
        Assert.assertThat((SimpleSink) withSharding.getSink(), Matchers.is(makeSimpleSink));
        Assert.assertThat(withSharding.getSharding(), Matchers.equalTo(SHARDING_TRANSFORM));
        Assert.assertThat(withSharding.withRunnerDeterminedSharding().getSharding(), Matchers.nullValue());
        Assert.assertThat(withNumShards.getSharding(), Matchers.equalTo(sharding));
    }

    @Test
    public void testDisplayData() {
        SimpleSink simpleSink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { // from class: org.apache.beam.sdk.io.WriteFilesTest.3
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("foo", "bar"));
            }
        };
        DisplayData from = DisplayData.from(WriteFiles.to(simpleSink));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("sink", simpleSink.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFor("sink", simpleSink));
    }

    @Test
    public void testShardedDisplayData() {
        SimpleSink simpleSink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { // from class: org.apache.beam.sdk.io.WriteFilesTest.4
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("foo", "bar"));
            }
        };
        DisplayData from = DisplayData.from(WriteFiles.to(simpleSink).withNumShards(1));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("sink", simpleSink.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFor("sink", simpleSink));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("numShards", "1"));
    }

    @Test
    public void testCustomShardStrategyDisplayData() {
        SimpleSink simpleSink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { // from class: org.apache.beam.sdk.io.WriteFilesTest.5
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("foo", "bar"));
            }
        };
        DisplayData from = DisplayData.from(WriteFiles.to(simpleSink).withSharding(new PTransform<PCollection<String>, PCollectionView<Integer>>() { // from class: org.apache.beam.sdk.io.WriteFilesTest.6
            public PCollectionView<Integer> expand(PCollection<String> pCollection) {
                return null;
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("spam", "ham"));
            }
        }));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("sink", simpleSink.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFor("sink", simpleSink));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("spam", "ham"));
    }

    private void runWrite(List<String> list, PTransform<PCollection<String>, PCollection<String>> pTransform, String str) throws IOException {
        runShardedWrite(list, pTransform, str, Optional.absent());
    }

    private void runShardedWrite(List<String> list, PTransform<PCollection<String>, PCollection<String>> pTransform, String str, Optional<Integer> optional) throws IOException {
        WriteOptions writeOptions = (WriteOptions) TestPipeline.testingPipelineOptions().as(WriteOptions.class);
        writeOptions.setTestFlag("test_value");
        Pipeline create = TestPipeline.create(writeOptions);
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= list.size()) {
                break;
            }
            arrayList.add(Long.valueOf(j2 + 1));
            j = j2 + 1;
        }
        WriteFiles writeFiles = WriteFiles.to(makeSimpleSink());
        if (optional.isPresent()) {
            writeFiles = writeFiles.withNumShards(((Integer) optional.get()).intValue());
        }
        create.apply(Create.timestamped(list, arrayList).withCoder(StringUtf8Coder.of())).apply(pTransform).apply(writeFiles);
        create.run();
        checkFileContents(str, list, optional);
    }

    static void checkFileContents(String str, List<String> list, Optional<Integer> optional) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = ((MatchResult) FileSystems.match(Collections.singletonList(str + "*")).get(0)).metadata().iterator();
        while (it.hasNext()) {
            newArrayList.add(new File(((MatchResult.Metadata) it.next()).resourceId().toString()));
        }
        if (optional.isPresent()) {
            Assert.assertEquals(((Integer) optional.get()).intValue(), newArrayList.size());
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator it2 = newArrayList.iterator();
        while (it2.hasNext()) {
            BufferedReader bufferedReader = new BufferedReader(new FileReader((File) it2.next()));
            Throwable th = null;
            while (true) {
                try {
                    try {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        } else if (!readLine.equals("header") && !readLine.equals("footer")) {
                            newArrayList2.add(readLine);
                        }
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (bufferedReader != null) {
                        if (th != null) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    throw th2;
                }
            }
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedReader.close();
                }
            }
        }
        Assert.assertThat(newArrayList2, Matchers.containsInAnyOrder(list.toArray()));
    }
}
