package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.class */
public class BulkWriterTest extends TestLogger {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest$TestBulkWriter.class */
    private static class TestBulkWriter implements BulkWriter<Tuple2<String, Integer>> {
        private static final Charset CHARSET = StandardCharsets.UTF_8;
        private final FSDataOutputStream stream;

        TestBulkWriter(FSDataOutputStream fSDataOutputStream) {
            this.stream = (FSDataOutputStream) Preconditions.checkNotNull(fSDataOutputStream);
        }

        public void addElement(Tuple2<String, Integer> tuple2) throws IOException {
            this.stream.write((((String) tuple2.f0) + '@' + tuple2.f1 + '\n').getBytes(CHARSET));
        }

        public void flush() throws IOException {
            this.stream.flush();
        }

        public void finish() throws IOException {
            flush();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest$TestBulkWriterFactory.class */
    public static final class TestBulkWriterFactory implements BulkWriter.Factory<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1;

        public BulkWriter<Tuple2<String, Integer>> create(FSDataOutputStream fSDataOutputStream) {
            return new TestBulkWriter(fSDataOutputStream);
        }
    }

    @Test
    public void testCustomBulkWriter() throws Exception {
        File newFolder = TEMP_FOLDER.newFolder();
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createTestSinkWithBulkEncoder = TestUtils.createTestSinkWithBulkEncoder(newFolder, 1, 0, 10L, new TestUtils.TupleToStringBucketer(), new TestBulkWriterFactory(), new DefaultBucketFactoryImpl());
        Throwable th = null;
        try {
            try {
                testPartFilesWithStringBucketer(createTestSinkWithBulkEncoder, newFolder, ".part-0-0.inprogress", ".part-0-1.inprogress");
                if (createTestSinkWithBulkEncoder != null) {
                    if (0 == 0) {
                        createTestSinkWithBulkEncoder.close();
                        return;
                    }
                    try {
                        createTestSinkWithBulkEncoder.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestSinkWithBulkEncoder != null) {
                if (th != null) {
                    try {
                        createTestSinkWithBulkEncoder.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestSinkWithBulkEncoder.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCustomBulkWriterWithBucketAssigner() throws Exception {
        File newFolder = TEMP_FOLDER.newFolder();
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createTestSinkWithCustomizedBulkEncoder = TestUtils.createTestSinkWithCustomizedBulkEncoder(newFolder, 1, 0, 10L, new TestUtils.TupleToIntegerBucketer(), new TestBulkWriterFactory(), new DefaultBucketFactoryImpl());
        Throwable th = null;
        try {
            try {
                testPartFilesWithIntegerBucketer(createTestSinkWithCustomizedBulkEncoder, newFolder, ".part-0-0.inprogress", ".part-0-1.inprogress", ".part-0-2.inprogress");
                if (createTestSinkWithCustomizedBulkEncoder != null) {
                    if (0 == 0) {
                        createTestSinkWithCustomizedBulkEncoder.close();
                        return;
                    }
                    try {
                        createTestSinkWithCustomizedBulkEncoder.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestSinkWithCustomizedBulkEncoder != null) {
                if (th != null) {
                    try {
                        createTestSinkWithCustomizedBulkEncoder.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestSinkWithCustomizedBulkEncoder.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCustomBulkWriterWithPartConfig() throws Exception {
        File newFolder = TEMP_FOLDER.newFolder();
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createTestSinkWithBulkEncoder = TestUtils.createTestSinkWithBulkEncoder(newFolder, 1, 0, 10L, new TestUtils.TupleToStringBucketer(), new TestBulkWriterFactory(), new DefaultBucketFactoryImpl(), OutputFileConfig.builder().withPartPrefix("prefix").withPartSuffix(".ext").build());
        Throwable th = null;
        try {
            try {
                testPartFilesWithStringBucketer(createTestSinkWithBulkEncoder, newFolder, ".prefix-0-0.ext.inprogress", ".prefix-0-1.ext.inprogress");
                if (createTestSinkWithBulkEncoder != null) {
                    if (0 == 0) {
                        createTestSinkWithBulkEncoder.close();
                        return;
                    }
                    try {
                        createTestSinkWithBulkEncoder.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestSinkWithBulkEncoder != null) {
                if (th != null) {
                    try {
                        createTestSinkWithBulkEncoder.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestSinkWithBulkEncoder.close();
                }
            }
            throw th4;
        }
    }

    private void testPartFilesWithStringBucketer(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> oneInputStreamOperatorTestHarness, File file, String str, String str2) throws Exception {
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
        TestUtils.checkLocalFs(file, 1, 0);
        oneInputStreamOperatorTestHarness.snapshot(1L, 1L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
        oneInputStreamOperatorTestHarness.snapshot(2L, 2L);
        TestUtils.checkLocalFs(file, 2, 0);
        int i = 0;
        for (Map.Entry<File, String> entry : TestUtils.getFileContentByPath(file).entrySet()) {
            if (entry.getKey().getName().contains(str)) {
                i++;
                Assert.assertEquals("test1@1\n", entry.getValue());
            } else if (entry.getKey().getName().contains(str2)) {
                i++;
                Assert.assertEquals("test1@2\ntest1@3\n", entry.getValue());
            }
            Assert.assertEquals("test1", entry.getKey().getParentFile().getName());
        }
        Assert.assertEquals(2L, i);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(2L);
        TestUtils.checkLocalFs(file, 0, 2);
    }

    private void testPartFilesWithIntegerBucketer(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> oneInputStreamOperatorTestHarness, File file, String str, String str2, String str3) throws Exception {
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
        TestUtils.checkLocalFs(file, 1, 0);
        oneInputStreamOperatorTestHarness.snapshot(1L, 1L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
        oneInputStreamOperatorTestHarness.snapshot(2L, 2L);
        TestUtils.checkLocalFs(file, 3, 0);
        int i = 0;
        for (Map.Entry<File, String> entry : TestUtils.getFileContentByPath(file).entrySet()) {
            if (entry.getKey().getName().contains(str)) {
                i++;
                Assert.assertEquals("test1@1\n", entry.getValue());
                Assert.assertEquals("1", entry.getKey().getParentFile().getName());
            } else if (entry.getKey().getName().contains(str2)) {
                i++;
                Assert.assertEquals("test1@2\n", entry.getValue());
                Assert.assertEquals("2", entry.getKey().getParentFile().getName());
            } else if (entry.getKey().getName().contains(str3)) {
                i++;
                Assert.assertEquals("test1@3\n", entry.getValue());
                Assert.assertEquals("3", entry.getKey().getParentFile().getName());
            }
        }
        Assert.assertEquals(3L, i);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(2L);
        TestUtils.checkLocalFs(file, 0, 3);
    }
}
