/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketState;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BucketStateSerializerTest {
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";
    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testSerializationEmpty() throws IOException {
        File testFolder = tempFolder.newFolder();
        FileSystem fs = FileSystem.get((URI)testFolder.toURI());
        RecoverableWriter writer = fs.createRecoverableWriter();
        Path testBucket = new Path(testFolder.getPath(), "test");
        BucketState bucketState = new BucketState((Object)"test", testBucket, Long.MAX_VALUE, null, new HashMap());
        BucketStateSerializer serializer = new BucketStateSerializer(writer.getResumeRecoverableSerializer(), writer.getCommitRecoverableSerializer(), (SimpleVersionedSerializer)SimpleVersionedStringSerializer.INSTANCE);
        byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)serializer, (Object)bucketState);
        BucketState recoveredState = (BucketState)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)serializer, (byte[])bytes);
        Assert.assertEquals((Object)testBucket, (Object)recoveredState.getBucketPath());
        Assert.assertNull((Object)recoveredState.getInProgressResumableFile());
        Assert.assertTrue((boolean)recoveredState.getCommittableFilesPerCheckpoint().isEmpty());
    }

    @Test
    public void testSerializationOnlyInProgress() throws IOException {
        File testFolder = tempFolder.newFolder();
        FileSystem fs = FileSystem.get((URI)testFolder.toURI());
        Path testBucket = new Path(testFolder.getPath(), "test");
        RecoverableWriter writer = fs.createRecoverableWriter();
        RecoverableFsDataOutputStream stream = writer.open(testBucket);
        stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
        RecoverableWriter.ResumeRecoverable current = stream.persist();
        BucketState bucketState = new BucketState((Object)"test", testBucket, Long.MAX_VALUE, current, new HashMap());
        BucketStateSerializer serializer = new BucketStateSerializer(writer.getResumeRecoverableSerializer(), writer.getCommitRecoverableSerializer(), (SimpleVersionedSerializer)SimpleVersionedStringSerializer.INSTANCE);
        byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)serializer, (Object)bucketState);
        stream.close();
        BucketState recoveredState = (BucketState)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)serializer, (byte[])bytes);
        Assert.assertEquals((Object)testBucket, (Object)recoveredState.getBucketPath());
        FileStatus[] statuses = fs.listStatus(testBucket.getParent());
        Assert.assertEquals((long)1L, (long)statuses.length);
        Assert.assertTrue((boolean)statuses[0].getPath().getPath().startsWith(new Path(testBucket.getParent(), ".test.inprogress").getPath()));
    }

    @Test
    public void testSerializationFull() throws IOException {
        int noOfTasks = 5;
        File testFolder = tempFolder.newFolder();
        FileSystem fs = FileSystem.get((URI)testFolder.toURI());
        RecoverableWriter writer = fs.createRecoverableWriter();
        Path bucketPath = new Path(testFolder.getPath());
        HashMap commitRecoverables = new HashMap();
        for (int i = 0; i < 5; ++i) {
            ArrayList<RecoverableWriter.CommitRecoverable> recoverables = new ArrayList<RecoverableWriter.CommitRecoverable>();
            for (int j = 0; j < 2 + i; ++j) {
                Path part = new Path(bucketPath, "part-" + i + '-' + j);
                RecoverableFsDataOutputStream stream = writer.open(part);
                stream.write(("wrote-" + j).getBytes(Charset.forName("UTF-8")));
                recoverables.add(stream.closeForCommit().getRecoverable());
            }
            commitRecoverables.put(Long.valueOf(i), recoverables);
        }
        Path testBucket = new Path(bucketPath, "test-2");
        RecoverableFsDataOutputStream stream = writer.open(testBucket);
        stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
        RecoverableWriter.ResumeRecoverable current = stream.persist();
        BucketState bucketState = new BucketState((Object)"test-2", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
        BucketStateSerializer serializer = new BucketStateSerializer(writer.getResumeRecoverableSerializer(), writer.getCommitRecoverableSerializer(), (SimpleVersionedSerializer)SimpleVersionedStringSerializer.INSTANCE);
        stream.close();
        byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)serializer, (Object)bucketState);
        BucketState recoveredState = (BucketState)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)serializer, (byte[])bytes);
        Assert.assertEquals((Object)bucketPath, (Object)recoveredState.getBucketPath());
        Map recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
        Assert.assertEquals((long)5L, (long)recoveredRecoverables.size());
        for (Map.Entry entry : recoveredRecoverables.entrySet()) {
            for (RecoverableWriter.CommitRecoverable recoverable : (List)entry.getValue()) {
                writer.recoverForCommit(recoverable).commit();
            }
        }
        FileStatus[] filestatuses = fs.listStatus(bucketPath);
        HashSet<String> paths = new HashSet<String>(filestatuses.length);
        for (FileStatus filestatus : filestatuses) {
            paths.add(filestatus.getPath().getPath());
        }
        for (int i = 0; i < 5; ++i) {
            for (int j = 0; j < 2 + i; ++j) {
                String part = new Path(bucketPath, "part-" + i + '-' + j).getPath();
                Assert.assertTrue((boolean)paths.contains(part));
                paths.remove(part);
            }
        }
        Assert.assertEquals((long)1L, (long)paths.size());
        Assert.assertTrue((boolean)((String)paths.iterator().next()).startsWith(new Path(testBucket.getParent(), ".test-2.inprogress").getPath()));
    }

    @Test
    public void testSerializationNullInProgress() throws IOException {
        int noOfTasks = 5;
        File testFolder = tempFolder.newFolder();
        FileSystem fs = FileSystem.get((URI)testFolder.toURI());
        RecoverableWriter writer = fs.createRecoverableWriter();
        Path bucketPath = new Path(testFolder.getPath());
        HashMap commitRecoverables = new HashMap();
        for (int i = 0; i < 5; ++i) {
            ArrayList<RecoverableWriter.CommitRecoverable> recoverables = new ArrayList<RecoverableWriter.CommitRecoverable>();
            for (int j = 0; j < 2 + i; ++j) {
                Path part = new Path(bucketPath, "test-" + i + '-' + j);
                RecoverableFsDataOutputStream stream = writer.open(part);
                stream.write(("wrote-" + j).getBytes(Charset.forName("UTF-8")));
                recoverables.add(stream.closeForCommit().getRecoverable());
            }
            commitRecoverables.put(Long.valueOf(i), recoverables);
        }
        RecoverableWriter.ResumeRecoverable current = null;
        BucketState bucketState = new BucketState((Object)"", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
        BucketStateSerializer serializer = new BucketStateSerializer(writer.getResumeRecoverableSerializer(), writer.getCommitRecoverableSerializer(), (SimpleVersionedSerializer)SimpleVersionedStringSerializer.INSTANCE);
        byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)serializer, (Object)bucketState);
        BucketState recoveredState = (BucketState)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)serializer, (byte[])bytes);
        Assert.assertEquals((Object)bucketPath, (Object)recoveredState.getBucketPath());
        Assert.assertNull((Object)recoveredState.getInProgressResumableFile());
        Map recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
        Assert.assertEquals((long)5L, (long)recoveredRecoverables.size());
        for (Map.Entry entry : recoveredRecoverables.entrySet()) {
            for (RecoverableWriter.CommitRecoverable recoverable : (List)entry.getValue()) {
                writer.recoverForCommit(recoverable).commit();
            }
        }
        FileStatus[] filestatuses = fs.listStatus(bucketPath);
        HashSet<String> paths = new HashSet<String>(filestatuses.length);
        for (FileStatus filestatus : filestatuses) {
            paths.add(filestatus.getPath().getPath());
        }
        for (int i = 0; i < 5; ++i) {
            for (int j = 0; j < 2 + i; ++j) {
                String part = new Path(bucketPath, "test-" + i + '-' + j).getPath();
                Assert.assertTrue((boolean)paths.contains(part));
                paths.remove(part);
            }
        }
        Assert.assertTrue((boolean)paths.isEmpty());
    }
}

