/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketState;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.FileUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BucketStateSerializerTest {
    private static final int CURRENT_VERSION = 2;
    @Parameterized.Parameter
    public Integer previousVersion;
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";
    private static final String BUCKET_ID = "test-bucket";
    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();

    @Parameterized.Parameters(name="Previous Version = {0}")
    public static Collection<Integer> previousVersions() {
        return Arrays.asList(1, 2);
    }

    private static java.nio.file.Path getResourcePath(String scenarioName, int version) {
        return Paths.get("src/test/resources/", new String[0]).resolve("bucket-state-migration-test").resolve(scenarioName + "-v" + version);
    }

    private static java.nio.file.Path getSnapshotPath(String scenarioName, int version) {
        java.nio.file.Path basePath = BucketStateSerializerTest.getResourcePath(scenarioName, version);
        return basePath.resolve("snapshot");
    }

    private static java.nio.file.Path getOutputPath(String scenarioName, int version) {
        java.nio.file.Path basePath = BucketStateSerializerTest.getResourcePath(scenarioName, version);
        return basePath.resolve("bucket");
    }

    @Test
    @Ignore
    public void prepareDeserializationEmpty() throws IOException {
        String scenarioName = "empty";
        java.nio.file.Path scenarioPath = BucketStateSerializerTest.getResourcePath("empty", 2);
        FileUtils.deleteDirectory((File)scenarioPath.toFile());
        Files.createDirectories(scenarioPath, new FileAttribute[0]);
        java.nio.file.Path outputPath = BucketStateSerializerTest.getOutputPath("empty", 2);
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        Bucket<String, String> bucket = BucketStateSerializerTest.createNewBucket(testBucketPath);
        BucketState bucketState = bucket.onReceptionOfCheckpoint(0L);
        byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(BucketStateSerializerTest.bucketStateSerializer(), (Object)bucketState);
        Files.write(BucketStateSerializerTest.getSnapshotPath("empty", 2), bytes, new OpenOption[0]);
    }

    @Test
    public void testSerializationEmpty() throws IOException {
        String scenarioName = "empty";
        java.nio.file.Path outputPath = BucketStateSerializerTest.getOutputPath("empty", this.previousVersion);
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        BucketState<String> recoveredState = BucketStateSerializerTest.readBucketState("empty", this.previousVersion);
        Bucket<String, String> bucket = BucketStateSerializerTest.restoreBucket(0, recoveredState);
        Assert.assertEquals((Object)testBucketPath, (Object)bucket.getBucketPath());
        Assert.assertNull((Object)bucket.getInProgressPart());
        Assert.assertTrue((boolean)bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
    }

    @Test
    @Ignore
    public void prepareDeserializationOnlyInProgress() throws IOException {
        String scenarioName = "only-in-progress";
        java.nio.file.Path scenarioPath = BucketStateSerializerTest.getResourcePath("only-in-progress", 2);
        FileUtils.deleteDirectory((File)scenarioPath.toFile());
        Files.createDirectories(scenarioPath, new FileAttribute[0]);
        java.nio.file.Path outputPath = BucketStateSerializerTest.getOutputPath("only-in-progress", 2);
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        Bucket<String, String> bucket = BucketStateSerializerTest.createNewBucket(testBucketPath);
        bucket.write((Object)IN_PROGRESS_CONTENT, System.currentTimeMillis());
        BucketState bucketState = bucket.onReceptionOfCheckpoint(0L);
        byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(BucketStateSerializerTest.bucketStateSerializer(), (Object)bucketState);
        Files.write(BucketStateSerializerTest.getSnapshotPath("only-in-progress", 2), bytes, new OpenOption[0]);
    }

    @Test
    public void testSerializationOnlyInProgress() throws IOException {
        String scenarioName = "only-in-progress";
        java.nio.file.Path outputPath = BucketStateSerializerTest.getOutputPath("only-in-progress", this.previousVersion);
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        BucketState<String> recoveredState = BucketStateSerializerTest.readBucketState("only-in-progress", this.previousVersion);
        Bucket<String, String> bucket = BucketStateSerializerTest.restoreBucket(0, recoveredState);
        Assert.assertEquals((Object)testBucketPath, (Object)bucket.getBucketPath());
        Assert.assertEquals((long)8L, (long)bucket.getInProgressPart().getSize());
        long numFiles = Files.list(Paths.get(testBucketPath.toString(), new String[0])).map(file -> {
            Assert.assertThat((Object)file.getFileName().toString(), (Matcher)CoreMatchers.startsWith((String)".part-0-0.inprogress"));
            return 1;
        }).count();
        Assert.assertThat((Object)numFiles, (Matcher)CoreMatchers.is((Object)1L));
    }

    @Test
    @Ignore
    public void prepareDeserializationFull() throws IOException {
        BucketStateSerializerTest.prepareDeserializationFull(true, "full");
    }

    @Test
    public void testSerializationFull() throws IOException {
        this.testDeserializationFull(true, "full");
    }

    @Test
    @Ignore
    public void prepareDeserializationNullInProgress() throws IOException {
        BucketStateSerializerTest.prepareDeserializationFull(false, "full-no-in-progress");
    }

    @Test
    public void testSerializationNullInProgress() throws IOException {
        this.testDeserializationFull(false, "full-no-in-progress");
    }

    private static void prepareDeserializationFull(boolean withInProgress, String scenarioName) throws IOException {
        java.nio.file.Path scenarioPath = BucketStateSerializerTest.getResourcePath(scenarioName, 2);
        FileUtils.deleteDirectory((File)Paths.get(scenarioPath.toString() + "-template", new String[0]).toFile());
        Files.createDirectories(scenarioPath, new FileAttribute[0]);
        int noOfPendingCheckpoints = 5;
        java.nio.file.Path outputPath = BucketStateSerializerTest.getOutputPath(scenarioName, 2);
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        Bucket<String, String> bucket = BucketStateSerializerTest.createNewBucket(testBucketPath);
        BucketState bucketState = null;
        for (int i = 0; i < 5; ++i) {
            bucket.write((Object)PENDING_CONTENT, System.currentTimeMillis());
            bucket.write((Object)PENDING_CONTENT, System.currentTimeMillis());
            bucketState = bucket.onReceptionOfCheckpoint((long)i);
        }
        if (withInProgress) {
            bucket.write((Object)IN_PROGRESS_CONTENT, System.currentTimeMillis());
            bucketState = bucket.onReceptionOfCheckpoint(5L);
        }
        byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(BucketStateSerializerTest.bucketStateSerializer(), bucketState);
        Files.write(BucketStateSerializerTest.getSnapshotPath(scenarioName, 2), bytes, new OpenOption[0]);
        BucketStateSerializerTest.moveToTemplateDirectory(scenarioPath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDeserializationFull(boolean withInProgress, String scenarioName) throws IOException {
        try {
            java.nio.file.Path outputPath = BucketStateSerializerTest.getOutputPath(scenarioName, this.previousVersion);
            Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
            BucketState<String> recoveredState = BucketStateSerializerTest.readBucketStateFromTemplate(scenarioName, this.previousVersion);
            int noOfPendingCheckpoints = 5;
            Map pendingFileRecoverables = recoveredState.getPendingFileRecoverablesPerCheckpoint();
            Assert.assertEquals((long)5L, (long)pendingFileRecoverables.size());
            Set beforeRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)).map(file -> file.getFileName().toString()).collect(Collectors.toSet());
            for (int i = 0; i < 5; ++i) {
                String part = ".part-0-" + i + ".inprogress";
                Assert.assertThat(beforeRestorePaths, (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.startsWith((String)part)));
            }
            Bucket<String, String> bucket = BucketStateSerializerTest.restoreBucket(6, recoveredState);
            Assert.assertEquals((Object)testBucketPath, (Object)bucket.getBucketPath());
            Assert.assertEquals((long)0L, (long)bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
            Set afterRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)).map(file -> file.getFileName().toString()).collect(Collectors.toSet());
            for (int i = 0; i < 5; ++i) {
                String part = "part-0-" + i;
                Assert.assertThat(afterRestorePaths, (Matcher)CoreMatchers.hasItem((Object)part));
                afterRestorePaths.remove(part);
            }
            if (withInProgress) {
                Assert.assertThat(afterRestorePaths, (Matcher)Matchers.iterableWithSize((int)1));
                Assert.assertThat(afterRestorePaths, (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.startsWith((String)".part-0-5.inprogress")));
            } else {
                Assert.assertThat(afterRestorePaths, (Matcher)Matchers.empty());
            }
        }
        finally {
            FileUtils.deleteDirectory((File)BucketStateSerializerTest.getResourcePath(scenarioName, this.previousVersion).toFile());
        }
    }

    private static Bucket<String, String> createNewBucket(Path bucketPath) throws IOException {
        return Bucket.getNew((int)0, (Object)BUCKET_ID, (Path)bucketPath, (long)0L, BucketStateSerializerTest.createBucketWriter(), (RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(10L).build(), (OutputFileConfig)OutputFileConfig.builder().build());
    }

    private static Bucket<String, String> restoreBucket(int initialPartCounter, BucketState<String> bucketState) throws IOException {
        return Bucket.restore((int)0, (long)initialPartCounter, BucketStateSerializerTest.createBucketWriter(), (RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(10L).build(), bucketState, (OutputFileConfig)OutputFileConfig.builder().build());
    }

    private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
        return new RowWiseBucketWriter(FileSystem.getLocalFileSystem().createRecoverableWriter(), (Encoder)new SimpleStringEncoder());
    }

    private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
        RowWiseBucketWriter<String, String> bucketWriter = BucketStateSerializerTest.createBucketWriter();
        return new BucketStateSerializer(bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), bucketWriter.getProperties().getPendingFileRecoverableSerializer(), (SimpleVersionedSerializer)SimpleVersionedStringSerializer.INSTANCE);
    }

    private static BucketState<String> readBucketState(String scenarioName, int version) throws IOException {
        byte[] bytes = Files.readAllBytes(BucketStateSerializerTest.getSnapshotPath(scenarioName, version));
        return (BucketState)SimpleVersionedSerialization.readVersionAndDeSerialize(BucketStateSerializerTest.bucketStateSerializer(), (byte[])bytes);
    }

    private static BucketState<String> readBucketStateFromTemplate(String scenarioName, int version) throws IOException {
        java.nio.file.Path scenarioPath = BucketStateSerializerTest.getResourcePath(scenarioName, version);
        FileUtils.deleteDirectory((File)scenarioPath.toFile());
        FileUtils.copy((Path)new Path(scenarioPath.toString() + "-template"), (Path)new Path(scenarioPath.toString()), (boolean)false);
        return BucketStateSerializerTest.readBucketState(scenarioName, version);
    }

    private static void moveToTemplateDirectory(java.nio.file.Path scenarioPath) throws IOException {
        FileUtils.copy((Path)new Path(scenarioPath.toString()), (Path)new Path(scenarioPath.toString() + "-template"), (boolean)false);
        FileUtils.deleteDirectory((File)scenarioPath.toFile());
    }
}

