package org.apache.beam.sdk.io;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.apache.beam.repackaged.core.org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.beam.repackaged.core.org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.SystemUtils;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.SimpleSink;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSinkTest.class */
public class FileBasedSinkTest {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private final String tempDirectoryName = "temp";

    private ResourceId getTemporaryFolder() {
        return LocalResources.fromFile(this.tmpFolder.getRoot(), true);
    }

    private ResourceId getBaseOutputDirectory() {
        return getTemporaryFolder().resolve("output", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    private ResourceId getBaseTempDirectory() {
        return getTemporaryFolder().resolve("temp", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    @Test
    public void testWriter() throws Exception {
        ResourceId resolve = getBaseTempDirectory().resolve(FileBasedSink.Writer.spreadUid("testId"), ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        List asList = Arrays.asList("sympathetic vulture", "boresome hummingbird");
        List<String> arrayList = new ArrayList<>();
        arrayList.add("header");
        arrayList.addAll(asList);
        arrayList.add("footer");
        SimpleSink.SimpleWriter<Void> m300createWriter = buildWriteOperationWithTempDir(getBaseTempDirectory()).m300createWriter();
        m300createWriter.open("testId");
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            m300createWriter.write((String) it.next());
        }
        m300createWriter.close();
        Assert.assertEquals(resolve, m300createWriter.getOutputFile());
        assertFileContains(arrayList, resolve);
    }

    private void assertFileContains(List<String> list, ResourceId resourceId) throws Exception {
        BufferedReader newBufferedReader = Files.newBufferedReader(Paths.get(resourceId.toString(), new String[0]), Charsets.UTF_8);
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                while (true) {
                    String readLine = newBufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    } else {
                        arrayList.add(readLine);
                    }
                }
                Assert.assertEquals("contents for " + resourceId, list, arrayList);
                if (newBufferedReader != null) {
                    $closeResource(null, newBufferedReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (newBufferedReader != null) {
                $closeResource(th, newBufferedReader);
            }
            throw th3;
        }
    }

    private void writeFile(List<String> list, File file) throws Exception {
        PrintWriter printWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), Charsets.UTF_8)));
        Throwable th = null;
        try {
            try {
                Iterator<String> it = list.iterator();
                while (it.hasNext()) {
                    printWriter.println(it.next());
                }
                $closeResource(null, printWriter);
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, printWriter);
            throw th2;
        }
    }

    @Test
    public void testTemporaryDirectoryUniqueness() {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(1000);
        for (int i = 0; i < 1000; i++) {
            newArrayListWithCapacity.add(buildWriteOperation());
        }
        HashSet newHashSetWithExpectedSize = Sets.newHashSetWithExpectedSize(1000);
        Iterator it = newArrayListWithCapacity.iterator();
        while (it.hasNext()) {
            newHashSetWithExpectedSize.add(((SimpleSink.SimpleWriteOperation) it.next()).getTempDirectory().toString());
        }
        Assert.assertEquals(1000L, newHashSetWithExpectedSize.size());
    }

    @Test
    public void testRemoveWithTempFilename() throws Exception {
        testRemoveTemporaryFiles(3, getBaseTempDirectory());
    }

    @Test
    public void testFinalize() throws Exception {
        Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
        runFinalize(buildWriteOperation(), generateTemporaryFilesForFinalize(3));
    }

    @Test
    public void testFinalizeMultipleCalls() throws Exception {
        Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
        List<File> generateTemporaryFilesForFinalize = generateTemporaryFilesForFinalize(3);
        SimpleSink.SimpleWriteOperation<Void> buildWriteOperation = buildWriteOperation();
        runFinalize(buildWriteOperation, generateTemporaryFilesForFinalize);
        runFinalize(buildWriteOperation, generateTemporaryFilesForFinalize);
    }

    @Test
    public void testFinalizeWithIntermediateState() throws Exception {
        Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
        SimpleSink.SimpleWriteOperation<Void> buildWriteOperation = buildWriteOperation();
        List<File> generateTemporaryFilesForFinalize = generateTemporaryFilesForFinalize(3);
        runFinalize(buildWriteOperation, generateTemporaryFilesForFinalize);
        this.tmpFolder.newFolder("temp");
        this.tmpFolder.newFile("temp/1");
        runFinalize(buildWriteOperation, generateTemporaryFilesForFinalize);
    }

    private List<File> generateTemporaryFilesForFinalize(int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            File file = new File(this.tmpFolder.getRoot(), FileBasedSink.WriteOperation.buildTemporaryFilename(getBaseTempDirectory(), "" + i2).toString());
            file.getParentFile().mkdirs();
            Assert.assertTrue(file.createNewFile());
            arrayList.add(file);
        }
        return arrayList;
    }

    private void runFinalize(SimpleSink.SimpleWriteOperation<Void> simpleWriteOperation, List<File> list) throws Exception {
        int size = list.size();
        ArrayList arrayList = new ArrayList();
        Iterator<File> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new FileBasedSink.FileResult(LocalResources.fromFile(it.next(), false), -1, GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING, (Object) null));
        }
        simpleWriteOperation.moveToOutputFiles(simpleWriteOperation.finalizeDestination(null, GlobalWindow.INSTANCE, null, arrayList));
        for (int i = 0; i < size; i++) {
            ResourceId unwindowedFilename = simpleWriteOperation.getSink().getDynamicDestinations().getFilenamePolicy((Object) null).unwindowedFilename(i, size, FileBasedSink.CompressionType.UNCOMPRESSED);
            Assert.assertTrue(unwindowedFilename.toString(), new File(unwindowedFilename.toString()).exists());
            Assert.assertFalse(list.get(i).exists());
        }
        Assert.assertFalse(new File(simpleWriteOperation.getTempDirectory().toString()).exists());
        Assert.assertEquals(simpleWriteOperation.getTempDirectory(), simpleWriteOperation.getTempDirectory());
    }

    private void testRemoveTemporaryFiles(int i, ResourceId resourceId) throws Exception {
        SimpleSink.SimpleWriteOperation simpleWriteOperation = new SimpleSink.SimpleWriteOperation(SimpleSink.makeSimpleSink(getBaseOutputDirectory(), "file", "", "", Compression.UNCOMPRESSED), resourceId);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            File file = new File(FileBasedSink.WriteOperation.buildTemporaryFilename(resourceId, "file" + i2).toString());
            file.getParentFile().mkdirs();
            Assert.assertTrue("not able to create new temp file", file.createNewFile());
            arrayList.add(file);
            File file2 = new File(getBaseOutputDirectory().resolve("file" + i2, ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
            file2.getParentFile().mkdirs();
            Assert.assertTrue("not able to create new output file", file2.createNewFile());
            arrayList2.add(file2);
        }
        simpleWriteOperation.removeTemporaryFiles(Collections.emptySet(), true);
        for (int i3 = 0; i3 < i; i3++) {
            File file3 = (File) arrayList.get(i3);
            MatcherAssert.assertThat(String.format("temp file %s exists", file3), Boolean.valueOf(file3.exists()), Matchers.is(false));
            File file4 = (File) arrayList2.get(i3);
            MatcherAssert.assertThat(String.format("output file %s exists", file4), Boolean.valueOf(file4.exists()), Matchers.is(true));
        }
    }

    @Test
    public void testCopyToOutputFiles() throws Exception {
        SimpleSink.SimpleWriteOperation<Void> buildWriteOperation = buildWriteOperation();
        List asList = Arrays.asList("input-1", "input-2", "input-3");
        List asList2 = Arrays.asList("1", "2", "3");
        List asList3 = Arrays.asList("file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (int i = 0; i < asList.size(); i++) {
            newArrayList2.add(getBaseOutputDirectory().resolve((String) asList3.get(i), ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
            File newFile = this.tmpFolder.newFile((String) asList.get(i));
            writeFile(Collections.singletonList((String) asList2.get(i)), newFile);
            newArrayList.add(KV.of(new FileBasedSink.FileResult(LocalResources.fromFile(newFile, false), -1, GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING, (Object) null), buildWriteOperation.getSink().getDynamicDestinations().getFilenamePolicy((Object) null).unwindowedFilename(i, asList.size(), FileBasedSink.CompressionType.UNCOMPRESSED)));
        }
        buildWriteOperation.moveToOutputFiles(newArrayList);
        for (int i2 = 0; i2 < newArrayList2.size(); i2++) {
            assertFileContains(Collections.singletonList((String) asList2.get(i2)), (ResourceId) newArrayList2.get(i2));
        }
    }

    public List<ResourceId> generateDestinationFilenames(FileBasedSink.FilenamePolicy filenamePolicy, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(filenamePolicy.unwindowedFilename(i2, i, FileBasedSink.CompressionType.UNCOMPRESSED));
        }
        return arrayList;
    }

    @Test
    public void testGenerateOutputFilenames() {
        ResourceId baseOutputDirectory = getBaseOutputDirectory();
        FileBasedSink.FilenamePolicy filenamePolicy = SimpleSink.makeSimpleSink(baseOutputDirectory, "file", ".SSSSS.of.NNNNN", ".test", Compression.UNCOMPRESSED).getDynamicDestinations().getFilenamePolicy((Object) null);
        Assert.assertEquals(Arrays.asList(baseOutputDirectory.resolve("file.00000.of.00003.test", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), baseOutputDirectory.resolve("file.00001.of.00003.test", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), baseOutputDirectory.resolve("file.00002.of.00003.test", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)), generateDestinationFilenames(filenamePolicy, 3));
        Assert.assertEquals(Collections.singletonList(baseOutputDirectory.resolve("file.00000.of.00001.test", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)), generateDestinationFilenames(filenamePolicy, 1));
        Assert.assertEquals(new ArrayList(), generateDestinationFilenames(filenamePolicy, 0));
    }

    @Test
    public void testCollidingOutputFilenames() throws Exception {
        ResourceId baseOutputDirectory = getBaseOutputDirectory();
        SimpleSink.SimpleWriteOperation simpleWriteOperation = new SimpleSink.SimpleWriteOperation(SimpleSink.makeSimpleSink(baseOutputDirectory, "file", "-NN", "test", Compression.UNCOMPRESSED));
        try {
            ArrayList newArrayList = Lists.newArrayList();
            for (int i = 0; i < 3; i++) {
                newArrayList.add(new FileBasedSink.FileResult(baseOutputDirectory.resolve("temp" + i, ResolveOptions.StandardResolveOptions.RESOLVE_FILE), 1, GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING, (Object) null));
            }
            simpleWriteOperation.finalizeDestination(null, GlobalWindow.INSTANCE, 5, newArrayList);
            Assert.fail("Should have failed.");
        } catch (IllegalArgumentException e) {
            MatcherAssert.assertThat(e.getMessage(), Matchers.containsString("generated the same name"));
            MatcherAssert.assertThat(e.getMessage(), Matchers.containsString("temp0"));
            MatcherAssert.assertThat(e.getMessage(), Matchers.containsString("temp1"));
        }
    }

    @Test
    public void testGenerateOutputFilenamesWithoutExtension() {
        ResourceId baseOutputDirectory = getBaseOutputDirectory();
        FileBasedSink.FilenamePolicy filenamePolicy = SimpleSink.makeSimpleSink(baseOutputDirectory, "file", "-SSSSS-of-NNNNN", "", Compression.UNCOMPRESSED).getDynamicDestinations().getFilenamePolicy((Object) null);
        Assert.assertEquals(Arrays.asList(baseOutputDirectory.resolve("file-00000-of-00003", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), baseOutputDirectory.resolve("file-00001-of-00003", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), baseOutputDirectory.resolve("file-00002-of-00003", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)), generateDestinationFilenames(filenamePolicy, 3));
        Assert.assertEquals(Collections.singletonList(baseOutputDirectory.resolve("file-00000-of-00001", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)), generateDestinationFilenames(filenamePolicy, 1));
        Assert.assertEquals(new ArrayList(), generateDestinationFilenames(filenamePolicy, 0));
    }

    @Test
    public void testCompressionBZIP2() throws FileNotFoundException, IOException {
        assertReadValues(new BufferedReader(new InputStreamReader(new BZip2CompressorInputStream(new FileInputStream(writeValuesWithCompression(Compression.BZIP2, "abc", "123"))), StandardCharsets.UTF_8)), "abc", "123");
    }

    @Test
    public void testCompressionGZIP() throws FileNotFoundException, IOException {
        assertReadValues(new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(writeValuesWithCompression(Compression.GZIP, "abc", "123"))), StandardCharsets.UTF_8)), "abc", "123");
    }

    @Test
    public void testCompressionDEFLATE() throws FileNotFoundException, IOException {
        assertReadValues(new BufferedReader(new InputStreamReader(new DeflateCompressorInputStream(new FileInputStream(writeValuesWithCompression(Compression.DEFLATE, "abc", "123"))), StandardCharsets.UTF_8)), "abc", "123");
    }

    @Test
    public void testCompressionUNCOMPRESSED() throws FileNotFoundException, IOException {
        assertReadValues(new BufferedReader(new InputStreamReader(new FileInputStream(writeValuesWithCompression(Compression.UNCOMPRESSED, "abc", "123")), StandardCharsets.UTF_8)), "abc", "123");
    }

    private void assertReadValues(BufferedReader bufferedReader, String... strArr) throws IOException {
        try {
            for (String str : strArr) {
                Assert.assertEquals(String.format("Line should read '%s'", str), str, bufferedReader.readLine());
            }
        } finally {
            if (bufferedReader != null) {
                $closeResource(null, bufferedReader);
            }
        }
    }

    private File writeValuesWithCompression(Compression compression, String... strArr) throws IOException {
        File newFile = this.tmpFolder.newFile("test.gz");
        WritableByteChannel writeCompressed = compression.writeCompressed(Channels.newChannel(new FileOutputStream(newFile)));
        for (String str : strArr) {
            writeCompressed.write(ByteBuffer.wrap((str + StringUtils.LF).getBytes(StandardCharsets.UTF_8)));
        }
        writeCompressed.close();
        return newFile;
    }

    @Test
    public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
        SimpleSink.SimpleWriteOperation<Void> m299createWriteOperation = SimpleSink.makeSimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory()).m299createWriteOperation();
        FileBasedSink.Writer createWriter = m299createWriteOperation.createWriter();
        ResourceId resolve = m299createWriteOperation.getTempDirectory().resolve(FileBasedSink.Writer.spreadUid("testId"), ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        ArrayList arrayList = new ArrayList();
        arrayList.add("header");
        arrayList.add("header");
        arrayList.add("a");
        arrayList.add("a");
        arrayList.add("b");
        arrayList.add("b");
        arrayList.add("footer");
        arrayList.add("footer");
        createWriter.open("testId");
        createWriter.write("a");
        createWriter.write("b");
        createWriter.close();
        Assert.assertEquals(resolve, createWriter.getOutputFile());
        assertFileContains(arrayList, resolve);
    }

    private SimpleSink<Void> buildSink() {
        return SimpleSink.makeSimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", Compression.UNCOMPRESSED);
    }

    private SimpleSink.SimpleWriteOperation<Void> buildWriteOperationWithTempDir(ResourceId resourceId) {
        return new SimpleSink.SimpleWriteOperation<>(buildSink(), resourceId);
    }

    private SimpleSink.SimpleWriteOperation<Void> buildWriteOperation() {
        return buildSink().m299createWriteOperation();
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
