/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Charsets;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Lists;
import org.apache.beam.repackaged.beam_sdks_java_core.org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.beam.repackaged.beam_sdks_java_core.org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.DrunkWritableByteChannelFactory;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.SimpleSink;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class FileBasedSinkTest {
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private final String tempDirectoryName = "temp";

    private ResourceId getTemporaryFolder() {
        return LocalResources.fromFile((File)this.tmpFolder.getRoot(), (boolean)true);
    }

    private ResourceId getBaseOutputDirectory() {
        String baseOutputDirname = "output";
        return this.getTemporaryFolder().resolve(baseOutputDirname, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    private ResourceId getBaseTempDirectory() {
        return this.getTemporaryFolder().resolve("temp", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    @Test
    public void testWriter() throws Exception {
        String testUid = "testId";
        ResourceId expectedTempFile = this.getBaseTempDirectory().resolve(testUid, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
        ArrayList<String> expected = new ArrayList<String>();
        expected.add("header");
        expected.addAll(values);
        expected.add("footer");
        FileBasedSink.Writer writer = this.buildWriteOperationWithTempDir(this.getBaseTempDirectory()).createWriter();
        writer.open(testUid);
        for (String value : values) {
            writer.write(value);
        }
        writer.close();
        Assert.assertEquals((Object)expectedTempFile, (Object)writer.getOutputFile());
        this.assertFileContains(expected, expectedTempFile);
    }

    private void assertFileContains(List<String> expected, ResourceId file) throws Exception {
        try (BufferedReader reader = Files.newBufferedReader(Paths.get(file.toString(), new String[0]), Charsets.UTF_8);){
            String line;
            ArrayList<String> actual = new ArrayList<String>();
            while ((line = reader.readLine()) != null) {
                actual.add(line);
            }
            Assert.assertEquals((String)("contents for " + file), expected, actual);
        }
    }

    private void writeFile(List<String> lines, File file) throws Exception {
        try (PrintWriter writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter((OutputStream)new FileOutputStream(file), Charsets.UTF_8)));){
            for (String line : lines) {
                writer.println(line);
            }
        }
    }

    @Test
    public void testRemoveWithTempFilename() throws Exception {
        this.testRemoveTemporaryFiles(3, this.getBaseTempDirectory());
    }

    @Test
    public void testFinalize() throws Exception {
        List<File> files = this.generateTemporaryFilesForFinalize(3);
        this.runFinalize(this.buildWriteOperation(), files);
    }

    @Test
    public void testFinalizeMultipleCalls() throws Exception {
        List<File> files = this.generateTemporaryFilesForFinalize(3);
        SimpleSink.SimpleWriteOperation<Void> writeOp = this.buildWriteOperation();
        this.runFinalize(writeOp, files);
        this.runFinalize(writeOp, files);
    }

    @Test
    public void testFinalizeWithIntermediateState() throws Exception {
        SimpleSink.SimpleWriteOperation<Void> writeOp = this.buildWriteOperation();
        List<File> files = this.generateTemporaryFilesForFinalize(3);
        this.runFinalize(writeOp, files);
        this.tmpFolder.newFolder("temp");
        this.tmpFolder.newFile("temp/1");
        this.runFinalize(writeOp, files);
    }

    private List<File> generateTemporaryFilesForFinalize(int numFiles) throws Exception {
        ArrayList<File> temporaryFiles = new ArrayList<File>();
        for (int i = 0; i < numFiles; ++i) {
            ResourceId temporaryFile = FileBasedSink.WriteOperation.buildTemporaryFilename((ResourceId)this.getBaseTempDirectory(), (String)("" + i));
            File tmpFile = new File(this.tmpFolder.getRoot(), temporaryFile.toString());
            tmpFile.getParentFile().mkdirs();
            Assert.assertTrue((boolean)tmpFile.createNewFile());
            temporaryFiles.add(tmpFile);
        }
        return temporaryFiles;
    }

    private void runFinalize(SimpleSink.SimpleWriteOperation<Void> writeOp, List<File> temporaryFiles) throws Exception {
        int numFiles = temporaryFiles.size();
        ArrayList<FileBasedSink.FileResult> fileResults = new ArrayList<FileBasedSink.FileResult>();
        for (File temporaryFile : temporaryFiles) {
            fileResults.add(new FileBasedSink.FileResult(LocalResources.fromFile((File)temporaryFile, (boolean)false), -1, (BoundedWindow)GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING, null));
        }
        List resultsToFinalFilenames = writeOp.finalizeDestination(null, (BoundedWindow)GlobalWindow.INSTANCE, null, fileResults);
        writeOp.moveToOutputFiles(resultsToFinalFilenames);
        for (int i = 0; i < numFiles; ++i) {
            ResourceId outputFilename = writeOp.getSink().getDynamicDestinations().getFilenamePolicy(null).unwindowedFilename(i, numFiles, (FileBasedSink.OutputFileHints)FileBasedSink.CompressionType.UNCOMPRESSED);
            Assert.assertTrue((String)outputFilename.toString(), (boolean)new File(outputFilename.toString()).exists());
            Assert.assertFalse((boolean)temporaryFiles.get(i).exists());
        }
        Assert.assertFalse((boolean)new File(((ResourceId)writeOp.tempDirectory.get()).toString()).exists());
        Assert.assertEquals((Object)writeOp.tempDirectory.get(), (Object)writeOp.tempDirectory.get());
    }

    private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) throws Exception {
        int i;
        String prefix = "file";
        SimpleSink<Void> sink = SimpleSink.makeSimpleSink(this.getBaseOutputDirectory(), prefix, "", "", Compression.UNCOMPRESSED);
        SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<Void>(sink, tempDirectory);
        ArrayList<File> temporaryFiles = new ArrayList<File>();
        ArrayList<File> outputFiles = new ArrayList<File>();
        for (i = 0; i < numFiles; ++i) {
            ResourceId tempResource = FileBasedSink.WriteOperation.buildTemporaryFilename((ResourceId)tempDirectory, (String)(prefix + i));
            File tmpFile = new File(tempResource.toString());
            tmpFile.getParentFile().mkdirs();
            Assert.assertTrue((String)"not able to create new temp file", (boolean)tmpFile.createNewFile());
            temporaryFiles.add(tmpFile);
            ResourceId outputFileId = this.getBaseOutputDirectory().resolve(prefix + i, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
            File outputFile = new File(outputFileId.toString());
            outputFile.getParentFile().mkdirs();
            Assert.assertTrue((String)"not able to create new output file", (boolean)outputFile.createNewFile());
            outputFiles.add(outputFile);
        }
        writeOp.removeTemporaryFiles(Collections.emptySet(), true);
        for (i = 0; i < numFiles; ++i) {
            File temporaryFile = (File)temporaryFiles.get(i);
            Assert.assertThat((String)String.format("temp file %s exists", temporaryFile), (Object)temporaryFile.exists(), (Matcher)Matchers.is((Object)false));
            File outputFile = (File)outputFiles.get(i);
            Assert.assertThat((String)String.format("output file %s exists", outputFile), (Object)outputFile.exists(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    public void testCopyToOutputFiles() throws Exception {
        int i;
        SimpleSink.SimpleWriteOperation<Void> writeOp = this.buildWriteOperation();
        List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
        List<String> inputContents = Arrays.asList("1", "2", "3");
        List<String> expectedOutputFilenames = Arrays.asList("file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
        ArrayList<KV> resultsToFinalFilenames = Lists.newArrayList();
        ArrayList<ResourceId> expectedOutputPaths = Lists.newArrayList();
        for (i = 0; i < inputFilenames.size(); ++i) {
            expectedOutputPaths.add(this.getBaseOutputDirectory().resolve(expectedOutputFilenames.get(i), (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
            File inputTmpFile = this.tmpFolder.newFile(inputFilenames.get(i));
            List<String> lines = Collections.singletonList(inputContents.get(i));
            this.writeFile(lines, inputTmpFile);
            ResourceId finalFilename = writeOp.getSink().getDynamicDestinations().getFilenamePolicy(null).unwindowedFilename(i, inputFilenames.size(), (FileBasedSink.OutputFileHints)FileBasedSink.CompressionType.UNCOMPRESSED);
            resultsToFinalFilenames.add(KV.of((Object)new FileBasedSink.FileResult(LocalResources.fromFile((File)inputTmpFile, (boolean)false), -1, (BoundedWindow)GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING, null), (Object)finalFilename));
        }
        writeOp.moveToOutputFiles(resultsToFinalFilenames);
        for (i = 0; i < expectedOutputPaths.size(); ++i) {
            this.assertFileContains(Collections.singletonList(inputContents.get(i)), (ResourceId)expectedOutputPaths.get(i));
        }
    }

    public List<ResourceId> generateDestinationFilenames(FileBasedSink.FilenamePolicy policy, int numFiles) {
        ArrayList<ResourceId> filenames = new ArrayList<ResourceId>();
        for (int i = 0; i < numFiles; ++i) {
            filenames.add(policy.unwindowedFilename(i, numFiles, (FileBasedSink.OutputFileHints)FileBasedSink.CompressionType.UNCOMPRESSED));
        }
        return filenames;
    }

    @Test
    public void testGenerateOutputFilenames() {
        ResourceId root = this.getBaseOutputDirectory();
        SimpleSink<Void> sink = SimpleSink.makeSimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test", Compression.UNCOMPRESSED);
        FileBasedSink.FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
        List<ResourceId> expected = Arrays.asList(root.resolve("file.00000.of.00003.test", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE), root.resolve("file.00001.of.00003.test", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE), root.resolve("file.00002.of.00003.test", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
        List<ResourceId> actual = this.generateDestinationFilenames(policy, 3);
        Assert.assertEquals(expected, actual);
        expected = Collections.singletonList(root.resolve("file.00000.of.00001.test", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
        actual = this.generateDestinationFilenames(policy, 1);
        Assert.assertEquals(expected, actual);
        expected = new ArrayList<ResourceId>();
        actual = this.generateDestinationFilenames(policy, 0);
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testCollidingOutputFilenames() throws Exception {
        ResourceId root = this.getBaseOutputDirectory();
        SimpleSink<Void> sink = SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED);
        SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<Void>(sink);
        try {
            ArrayList<FileBasedSink.FileResult> results = Lists.newArrayList();
            for (int i = 0; i < 3; ++i) {
                results.add(new FileBasedSink.FileResult(root.resolve("temp" + i, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE), 1, (BoundedWindow)GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING, null));
            }
            writeOp.finalizeDestination(null, (BoundedWindow)GlobalWindow.INSTANCE, 5, results);
            Assert.fail((String)"Should have failed.");
        }
        catch (IllegalArgumentException exn) {
            Assert.assertThat((Object)exn.getMessage(), (Matcher)Matchers.containsString((String)"generated the same name"));
            Assert.assertThat((Object)exn.getMessage(), (Matcher)Matchers.containsString((String)"temp0"));
            Assert.assertThat((Object)exn.getMessage(), (Matcher)Matchers.containsString((String)"temp1"));
        }
    }

    @Test
    public void testGenerateOutputFilenamesWithoutExtension() {
        ResourceId root = this.getBaseOutputDirectory();
        SimpleSink<Void> sink = SimpleSink.makeSimpleSink(root, "file", "-SSSSS-of-NNNNN", "", Compression.UNCOMPRESSED);
        FileBasedSink.FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
        List<ResourceId> expected = Arrays.asList(root.resolve("file-00000-of-00003", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE), root.resolve("file-00001-of-00003", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE), root.resolve("file-00002-of-00003", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
        List<ResourceId> actual = this.generateDestinationFilenames(policy, 3);
        Assert.assertEquals(expected, actual);
        expected = Collections.singletonList(root.resolve("file-00000-of-00001", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
        actual = this.generateDestinationFilenames(policy, 1);
        Assert.assertEquals(expected, actual);
        expected = new ArrayList<ResourceId>();
        actual = this.generateDestinationFilenames(policy, 0);
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testCompressionBZIP2() throws FileNotFoundException, IOException {
        File file = this.writeValuesWithCompression(Compression.BZIP2, "abc", "123");
        this.assertReadValues(new BufferedReader(new InputStreamReader((InputStream)new BZip2CompressorInputStream(new FileInputStream(file)), StandardCharsets.UTF_8)), "abc", "123");
    }

    @Test
    public void testCompressionGZIP() throws FileNotFoundException, IOException {
        File file = this.writeValuesWithCompression(Compression.GZIP, "abc", "123");
        this.assertReadValues(new BufferedReader(new InputStreamReader((InputStream)new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8)), "abc", "123");
    }

    @Test
    public void testCompressionDEFLATE() throws FileNotFoundException, IOException {
        File file = this.writeValuesWithCompression(Compression.DEFLATE, "abc", "123");
        this.assertReadValues(new BufferedReader(new InputStreamReader((InputStream)new DeflateCompressorInputStream(new FileInputStream(file)), StandardCharsets.UTF_8)), "abc", "123");
    }

    @Test
    public void testCompressionUNCOMPRESSED() throws FileNotFoundException, IOException {
        File file = this.writeValuesWithCompression(Compression.UNCOMPRESSED, "abc", "123");
        this.assertReadValues(new BufferedReader(new InputStreamReader((InputStream)new FileInputStream(file), StandardCharsets.UTF_8)), "abc", "123");
    }

    private void assertReadValues(BufferedReader br, String ... values) throws IOException {
        try (BufferedReader lbr = br;){
            for (String value : values) {
                Assert.assertEquals((String)String.format("Line should read '%s'", value), (Object)value, (Object)lbr.readLine());
            }
        }
    }

    private File writeValuesWithCompression(Compression compression, String ... values) throws IOException {
        File file = this.tmpFolder.newFile("test.gz");
        WritableByteChannel channel = compression.writeCompressed(Channels.newChannel(new FileOutputStream(file)));
        for (String value : values) {
            channel.write(ByteBuffer.wrap((value + "\n").getBytes(StandardCharsets.UTF_8)));
        }
        channel.close();
        return file;
    }

    @Test
    public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
        String testUid = "testId";
        ResourceId root = this.getBaseOutputDirectory();
        FileBasedSink.WriteOperation writeOp = SimpleSink.makeSimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory()).createWriteOperation();
        FileBasedSink.Writer writer = writeOp.createWriter();
        ResourceId expectedFile = ((ResourceId)writeOp.tempDirectory.get()).resolve("testId", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        ArrayList<String> expected = new ArrayList<String>();
        expected.add("header");
        expected.add("header");
        expected.add("a");
        expected.add("a");
        expected.add("b");
        expected.add("b");
        expected.add("footer");
        expected.add("footer");
        writer.open("testId");
        writer.write((Object)"a");
        writer.write((Object)"b");
        writer.close();
        Assert.assertEquals((Object)expectedFile, (Object)writer.getOutputFile());
        this.assertFileContains(expected, expectedFile);
    }

    private SimpleSink<Void> buildSink() {
        return SimpleSink.makeSimpleSink(this.getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", Compression.UNCOMPRESSED);
    }

    private SimpleSink.SimpleWriteOperation<Void> buildWriteOperationWithTempDir(ResourceId tempDirectory) {
        SimpleSink<Void> sink = this.buildSink();
        return new SimpleSink.SimpleWriteOperation<Void>(sink, tempDirectory);
    }

    private SimpleSink.SimpleWriteOperation<Void> buildWriteOperation() {
        return this.buildSink().createWriteOperation();
    }
}

