package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.class */
public class BucketStateSerializerTest {
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";

    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testSerializationEmpty() throws IOException {
        File newFolder = tempFolder.newFolder();
        RecoverableWriter createRecoverableWriter = FileSystem.get(newFolder.toURI()).createRecoverableWriter();
        Path path = new Path(newFolder.getPath(), "test");
        BucketState bucketState = new BucketState("test", path, Long.MAX_VALUE, (RecoverableWriter.ResumeRecoverable) null, new HashMap());
        BucketStateSerializer bucketStateSerializer = new BucketStateSerializer(createRecoverableWriter.getResumeRecoverableSerializer(), createRecoverableWriter.getCommitRecoverableSerializer(), SimpleVersionedStringSerializer.INSTANCE);
        BucketState bucketState2 = (BucketState) SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer, SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
        Assert.assertEquals(path, bucketState2.getBucketPath());
        Assert.assertNull(bucketState2.getInProgressResumableFile());
        Assert.assertTrue(bucketState2.getCommittableFilesPerCheckpoint().isEmpty());
    }

    @Test
    public void testSerializationOnlyInProgress() throws IOException {
        File newFolder = tempFolder.newFolder();
        FileSystem fileSystem = FileSystem.get(newFolder.toURI());
        Path path = new Path(newFolder.getPath(), "test");
        RecoverableWriter createRecoverableWriter = fileSystem.createRecoverableWriter();
        RecoverableFsDataOutputStream open = createRecoverableWriter.open(path);
        open.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
        BucketState bucketState = new BucketState("test", path, Long.MAX_VALUE, open.persist(), new HashMap());
        BucketStateSerializer bucketStateSerializer = new BucketStateSerializer(createRecoverableWriter.getResumeRecoverableSerializer(), createRecoverableWriter.getCommitRecoverableSerializer(), SimpleVersionedStringSerializer.INSTANCE);
        byte[] writeVersionAndSerialize = SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState);
        open.close();
        Assert.assertEquals(path, ((BucketState) SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer, writeVersionAndSerialize)).getBucketPath());
        FileStatus[] listStatus = fileSystem.listStatus(path.getParent());
        Assert.assertEquals(1L, listStatus.length);
        Assert.assertTrue(listStatus[0].getPath().getPath().startsWith(new Path(path.getParent(), ".test.inprogress").toString()));
    }

    @Test
    public void testSerializationFull() throws IOException {
        File newFolder = tempFolder.newFolder();
        FileSystem fileSystem = FileSystem.get(newFolder.toURI());
        RecoverableWriter createRecoverableWriter = fileSystem.createRecoverableWriter();
        Path path = new Path(newFolder.getPath());
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 5; i++) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 2 + i; i2++) {
                RecoverableFsDataOutputStream open = createRecoverableWriter.open(new Path(path, "part-" + i + '-' + i2));
                open.write(("wrote-" + i2).getBytes(Charset.forName("UTF-8")));
                arrayList.add(open.closeForCommit().getRecoverable());
            }
            hashMap.put(Long.valueOf(i), arrayList);
        }
        Path path2 = new Path(path, "test-2");
        RecoverableFsDataOutputStream open2 = createRecoverableWriter.open(path2);
        open2.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
        BucketState bucketState = new BucketState("test-2", path, Long.MAX_VALUE, open2.persist(), hashMap);
        BucketStateSerializer bucketStateSerializer = new BucketStateSerializer(createRecoverableWriter.getResumeRecoverableSerializer(), createRecoverableWriter.getCommitRecoverableSerializer(), SimpleVersionedStringSerializer.INSTANCE);
        open2.close();
        BucketState bucketState2 = (BucketState) SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer, SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
        Assert.assertEquals(path, bucketState2.getBucketPath());
        Map committableFilesPerCheckpoint = bucketState2.getCommittableFilesPerCheckpoint();
        Assert.assertEquals(5L, committableFilesPerCheckpoint.size());
        Iterator it = committableFilesPerCheckpoint.entrySet().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((List) ((Map.Entry) it.next()).getValue()).iterator();
            while (it2.hasNext()) {
                createRecoverableWriter.recoverForCommit((RecoverableWriter.CommitRecoverable) it2.next()).commit();
            }
        }
        FileStatus[] listStatus = fileSystem.listStatus(path);
        HashSet hashSet = new HashSet(listStatus.length);
        for (FileStatus fileStatus : listStatus) {
            hashSet.add(fileStatus.getPath().getPath());
        }
        for (int i3 = 0; i3 < 5; i3++) {
            for (int i4 = 0; i4 < 2 + i3; i4++) {
                String path3 = new Path(path, "part-" + i3 + '-' + i4).toString();
                Assert.assertTrue(hashSet.contains(path3));
                hashSet.remove(path3);
            }
        }
        Assert.assertEquals(1L, hashSet.size());
        Assert.assertTrue(((String) hashSet.iterator().next()).startsWith(new Path(path2.getParent(), ".test-2.inprogress").toString()));
    }

    @Test
    public void testSerializationNullInProgress() throws IOException {
        File newFolder = tempFolder.newFolder();
        FileSystem fileSystem = FileSystem.get(newFolder.toURI());
        RecoverableWriter createRecoverableWriter = fileSystem.createRecoverableWriter();
        Path path = new Path(newFolder.getPath());
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 5; i++) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 2 + i; i2++) {
                RecoverableFsDataOutputStream open = createRecoverableWriter.open(new Path(path, "test-" + i + '-' + i2));
                open.write(("wrote-" + i2).getBytes(Charset.forName("UTF-8")));
                arrayList.add(open.closeForCommit().getRecoverable());
            }
            hashMap.put(Long.valueOf(i), arrayList);
        }
        BucketState bucketState = new BucketState("", path, Long.MAX_VALUE, (RecoverableWriter.ResumeRecoverable) null, hashMap);
        BucketStateSerializer bucketStateSerializer = new BucketStateSerializer(createRecoverableWriter.getResumeRecoverableSerializer(), createRecoverableWriter.getCommitRecoverableSerializer(), SimpleVersionedStringSerializer.INSTANCE);
        BucketState bucketState2 = (BucketState) SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer, SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
        Assert.assertEquals(path, bucketState2.getBucketPath());
        Assert.assertNull(bucketState2.getInProgressResumableFile());
        Map committableFilesPerCheckpoint = bucketState2.getCommittableFilesPerCheckpoint();
        Assert.assertEquals(5L, committableFilesPerCheckpoint.size());
        Iterator it = committableFilesPerCheckpoint.entrySet().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((List) ((Map.Entry) it.next()).getValue()).iterator();
            while (it2.hasNext()) {
                createRecoverableWriter.recoverForCommit((RecoverableWriter.CommitRecoverable) it2.next()).commit();
            }
        }
        FileStatus[] listStatus = fileSystem.listStatus(path);
        HashSet hashSet = new HashSet(listStatus.length);
        for (FileStatus fileStatus : listStatus) {
            hashSet.add(fileStatus.getPath().getPath());
        }
        for (int i3 = 0; i3 < 5; i3++) {
            for (int i4 = 0; i4 < 2 + i3; i4++) {
                String path2 = new Path(path, "test-" + i3 + '-' + i4).toString();
                Assert.assertTrue(hashSet.contains(path2));
                hashSet.remove(path2);
            }
        }
        Assert.assertTrue(hashSet.isEmpty());
    }
}
