package org.apache.flink.connector.file.sink.writer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.committer.FileCommitter;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateGenerator;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStatePathResolver;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.FileUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.class */
public class FileWriterBucketStateSerializerMigrationTest {
    private static final int CURRENT_VERSION = 2;

    @Parameterized.Parameter
    public Integer previousVersion;
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";
    private static final String BUCKET_ID = "test-bucket";
    private static final Path BASE_PATH = Paths.get("src/test/resources/", new String[0]).resolve("bucket-state-migration-test");

    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();
    private final BucketStateGenerator generator = new BucketStateGenerator(BUCKET_ID, IN_PROGRESS_CONTENT, PENDING_CONTENT, BASE_PATH, CURRENT_VERSION);

    @Parameterized.Parameters(name = "Previous Version = {0}")
    public static Collection<Integer> previousVersions() {
        return Arrays.asList(1, Integer.valueOf(CURRENT_VERSION));
    }

    @Test
    @Ignore
    public void prepareDeserializationEmpty() throws IOException {
        this.generator.prepareDeserializationEmpty();
    }

    @Test
    public void testSerializationEmpty() throws IOException {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(new BucketStatePathResolver(BASE_PATH, this.previousVersion.intValue()).getOutputPath("empty").resolve(BUCKET_ID).toString());
        FileWriterBucket<String> restoreBucket = restoreBucket(readBucketState("empty", this.previousVersion.intValue()));
        Assert.assertEquals(path, restoreBucket.getBucketPath());
        Assert.assertNull(restoreBucket.getInProgressPart());
        Assert.assertTrue(restoreBucket.getPendingFiles().isEmpty());
    }

    @Test
    @Ignore
    public void prepareDeserializationOnlyInProgress() throws IOException {
        this.generator.prepareDeserializationOnlyInProgress();
    }

    @Test
    public void testSerializationOnlyInProgress() throws IOException {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(new BucketStatePathResolver(BASE_PATH, this.previousVersion.intValue()).getOutputPath("only-in-progress").resolve(BUCKET_ID).toString());
        FileWriterBucket<String> restoreBucket = restoreBucket(readBucketState("only-in-progress", this.previousVersion.intValue()));
        Assert.assertEquals(path, restoreBucket.getBucketPath());
        Assert.assertEquals(8L, restoreBucket.getInProgressPart().getSize());
        Assert.assertThat(Long.valueOf(Files.list(Paths.get(path.toString(), new String[0])).map(path2 -> {
            Assert.assertThat(path2.getFileName().toString(), CoreMatchers.startsWith(".part-0-0.inprogress"));
            return 1;
        }).count()), CoreMatchers.is(1L));
    }

    @Test
    @Ignore
    public void prepareDeserializationFull() throws IOException {
        this.generator.prepareDeserializationFull();
    }

    @Test
    public void testSerializationFull() throws IOException, InterruptedException {
        testDeserializationFull(true, "full");
    }

    @Test
    @Ignore
    public void prepareDeserializationNullInProgress() throws IOException {
        this.generator.prepareDeserializationNullInProgress();
    }

    @Test
    public void testSerializationNullInProgress() throws IOException, InterruptedException {
        testDeserializationFull(false, "full-no-in-progress");
    }

    private void testDeserializationFull(boolean z, String str) throws IOException, InterruptedException {
        BucketStatePathResolver bucketStatePathResolver = new BucketStatePathResolver(BASE_PATH, this.previousVersion.intValue());
        try {
            Path outputPath = bucketStatePathResolver.getOutputPath(str);
            org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(outputPath.resolve(BUCKET_ID).toString());
            FileWriterBucketState readBucketStateFromTemplate = readBucketStateFromTemplate(str, this.previousVersion.intValue());
            Assert.assertEquals(5L, readBucketStateFromTemplate.getPendingFileRecoverablesPerCheckpoint().size());
            Set set = (Set) Files.list(outputPath.resolve(BUCKET_ID)).map(path2 -> {
                return path2.getFileName().toString();
            }).collect(Collectors.toSet());
            for (int i = 0; i < 5; i++) {
                Assert.assertThat(set, CoreMatchers.hasItem(CoreMatchers.startsWith(".part-0-" + i + ".inprogress")));
            }
            FileWriterBucket<String> restoreBucket = restoreBucket(readBucketStateFromTemplate);
            Assert.assertEquals(path, restoreBucket.getBucketPath());
            Assert.assertEquals(5L, restoreBucket.getPendingFiles().size());
            restoreBucket.snapshotState();
            new FileCommitter(createBucketWriter()).commit((Collection) restoreBucket.prepareCommit(false).stream().map((v1) -> {
                return new MockCommitRequest(v1);
            }).collect(Collectors.toList()));
            Set set2 = (Set) Files.list(outputPath.resolve(BUCKET_ID)).map(path3 -> {
                return path3.getFileName().toString();
            }).collect(Collectors.toSet());
            for (int i2 = 0; i2 < 5; i2++) {
                String str2 = "part-0-" + i2;
                Assert.assertThat(set2, CoreMatchers.hasItem(str2));
                set2.remove(str2);
            }
            if (z) {
                Assert.assertThat(set2, Matchers.iterableWithSize(1));
                Assert.assertThat(set2, CoreMatchers.hasItem(CoreMatchers.startsWith(".part-0-5.inprogress")));
            } else {
                Assert.assertThat(set2, Matchers.empty());
            }
        } finally {
            FileUtils.deleteDirectory(bucketStatePathResolver.getResourcePath(str).toFile());
        }
    }

    private static FileWriterBucket<String> restoreBucket(FileWriterBucketState fileWriterBucketState) throws IOException {
        return FileWriterBucket.restore(createBucketWriter(), DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10L)).build(), fileWriterBucketState, OutputFileConfig.builder().build());
    }

    private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
        return new RowWiseBucketWriter<>(FileSystem.getLocalFileSystem().createRecoverableWriter(), new SimpleStringEncoder());
    }

    private static SimpleVersionedSerializer<FileWriterBucketState> bucketStateSerializer() throws IOException {
        RowWiseBucketWriter<String, String> createBucketWriter = createBucketWriter();
        return new FileWriterBucketStateSerializer(createBucketWriter.getProperties().getInProgressFileRecoverableSerializer(), createBucketWriter.getProperties().getPendingFileRecoverableSerializer());
    }

    private static FileWriterBucketState readBucketState(String str, int i) throws IOException {
        return (FileWriterBucketState) SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer(), Files.readAllBytes(new BucketStatePathResolver(BASE_PATH, i).getSnapshotPath(str)));
    }

    private static FileWriterBucketState readBucketStateFromTemplate(String str, int i) throws IOException {
        Path resourcePath = new BucketStatePathResolver(BASE_PATH, i).getResourcePath(str);
        FileUtils.deleteDirectory(resourcePath.toFile());
        FileUtils.copy(new org.apache.flink.core.fs.Path(resourcePath.toString() + "-template"), new org.apache.flink.core.fs.Path(resourcePath.toString()), false);
        return readBucketState(str, i);
    }
}
