package org.apache.flink.connector.file.sink;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/connector/file/sink/FileSinkCommittableSerializerMigrationTest.class */
class FileSinkCommittableSerializerMigrationTest {
    private static final int CURRENT_VERSION = 1;
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";
    private static final Path BASE_PATH = Paths.get("src/test/resources/", new String[0]).resolve("committable-serializer-migration");

    FileSinkCommittableSerializerMigrationTest() {
    }

    static Stream<Integer> previousVersions() {
        return Stream.of(Integer.valueOf(CURRENT_VERSION));
    }

    @Disabled
    @Test
    void prepareDeserializationInProgressToCleanup() throws IOException {
        Path resolveVersionPath = resolveVersionPath(1L, "in-progress");
        RowWiseBucketWriter<String, String> createBucketWriter = createBucketWriter();
        RecoverableWriter createRecoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
        FileSinkCommittableSerializer fileSinkCommittableSerializer = new FileSinkCommittableSerializer(createBucketWriter.getProperties().getPendingFileRecoverableSerializer(), createBucketWriter.getProperties().getInProgressFileRecoverableSerializer());
        RecoverableFsDataOutputStream open = createRecoverableWriter.open(new org.apache.flink.core.fs.Path(resolveVersionPath.resolve("content").toString()));
        open.write(IN_PROGRESS_CONTENT.getBytes(StandardCharsets.UTF_8));
        Files.write(resolveVersionPath.resolve("committable"), fileSinkCommittableSerializer.serialize(new FileSinkCommittable("0", new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(open.persist()))), new OpenOption[0]);
    }

    @MethodSource({"previousVersions"})
    @ParameterizedTest(name = "Previous Version = {0}")
    void testSerializationInProgressToCleanup(Integer num) throws IOException {
        Path resolveVersionPath = resolveVersionPath(num.intValue(), "in-progress");
        RowWiseBucketWriter<String, String> createBucketWriter = createBucketWriter();
        FileSinkCommittable deserialize = new FileSinkCommittableSerializer(createBucketWriter.getProperties().getPendingFileRecoverableSerializer(), createBucketWriter.getProperties().getInProgressFileRecoverableSerializer()).deserialize(num.intValue(), Files.readAllBytes(resolveVersionPath.resolve("committable")));
        Assertions.assertThat(deserialize.hasInProgressFileToCleanup()).isTrue();
        Assertions.assertThat(deserialize.hasPendingFile()).isFalse();
    }

    @Disabled
    @Test
    void prepareDeserializationPending() throws IOException {
        Path resolveVersionPath = resolveVersionPath(1L, "pending");
        RowWiseBucketWriter<String, String> createBucketWriter = createBucketWriter();
        RecoverableWriter createRecoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
        FileSinkCommittableSerializer fileSinkCommittableSerializer = new FileSinkCommittableSerializer(createBucketWriter.getProperties().getPendingFileRecoverableSerializer(), createBucketWriter.getProperties().getInProgressFileRecoverableSerializer());
        RecoverableFsDataOutputStream open = createRecoverableWriter.open(new org.apache.flink.core.fs.Path(resolveVersionPath.resolve("content").toString()));
        open.write(PENDING_CONTENT.getBytes(StandardCharsets.UTF_8));
        Files.write(resolveVersionPath.resolve("committable"), fileSinkCommittableSerializer.serialize(new FileSinkCommittable("0", new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(open.closeForCommit().getRecoverable()))), new OpenOption[0]);
    }

    @MethodSource({"previousVersions"})
    @ParameterizedTest(name = "Previous Version = {0}")
    void testSerializationPending(Integer num) throws IOException {
        Path resolveVersionPath = resolveVersionPath(num.intValue(), "pending");
        RowWiseBucketWriter<String, String> createBucketWriter = createBucketWriter();
        FileSinkCommittable deserialize = new FileSinkCommittableSerializer(createBucketWriter.getProperties().getPendingFileRecoverableSerializer(), createBucketWriter.getProperties().getInProgressFileRecoverableSerializer()).deserialize(num.intValue(), Files.readAllBytes(resolveVersionPath.resolve("committable")));
        Assertions.assertThat(deserialize.hasPendingFile()).isTrue();
        Assertions.assertThat(deserialize.hasInProgressFileToCleanup()).isFalse();
    }

    private Path resolveVersionPath(long j, String str) {
        return BASE_PATH.resolve(str + "-v" + j);
    }

    private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
        return new RowWiseBucketWriter<>(FileSystem.getLocalFileSystem().createRecoverableWriter(), new SimpleStringEncoder());
    }
}
