package org.apache.flink.connector.file.sink.committer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.connector.file.sink.utils.NoOpBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/file/sink/committer/FileCommitterTest.class */
public class FileCommitterTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/sink/committer/FileCommitterTest$RecordingPendingFile.class */
    public static class RecordingPendingFile implements BucketWriter.PendingFile {
        private boolean committed;

        private RecordingPendingFile() {
        }

        public void commit() throws IOException {
            commitAfterRecovery();
        }

        public void commitAfterRecovery() throws IOException {
            this.committed = true;
        }

        public boolean isCommitted() {
            return this.committed;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/file/sink/committer/FileCommitterTest$StubBucketWriter.class */
    private static class StubBucketWriter extends NoOpBucketWriter {
        private final List<RecordingPendingFile> recoveredPendingFiles;
        private int numCleanUp;

        private StubBucketWriter() {
            this.recoveredPendingFiles = new ArrayList();
        }

        @Override // org.apache.flink.connector.file.sink.utils.NoOpBucketWriter
        public BucketWriter.PendingFile recoverPendingFile(InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException {
            RecordingPendingFile recordingPendingFile = new RecordingPendingFile();
            this.recoveredPendingFiles.add(recordingPendingFile);
            return recordingPendingFile;
        }

        @Override // org.apache.flink.connector.file.sink.utils.NoOpBucketWriter
        public boolean cleanupInProgressFileRecoverable(InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException {
            this.numCleanUp++;
            return true;
        }

        public List<RecordingPendingFile> getRecoveredPendingFiles() {
            return this.recoveredPendingFiles;
        }

        public int getNumCleanUp() {
            return this.numCleanUp;
        }
    }

    @Test
    public void testCommitPendingFile() throws Exception {
        StubBucketWriter stubBucketWriter = new StubBucketWriter();
        new FileCommitter(stubBucketWriter).commit(Collections.singletonList(new MockCommitRequest(new FileSinkCommittable("0", new FileSinkTestUtils.TestPendingFileRecoverable()))));
        Assert.assertEquals(1L, stubBucketWriter.getRecoveredPendingFiles().size());
        Assert.assertEquals(0L, stubBucketWriter.getNumCleanUp());
        Assert.assertTrue(stubBucketWriter.getRecoveredPendingFiles().get(0).isCommitted());
        Assert.assertEquals(0L, r0.getNumberOfRetries());
    }

    @Test
    public void testCleanupInProgressFiles() throws Exception {
        new FileCommitter(new StubBucketWriter()).commit(Collections.singletonList(new MockCommitRequest(new FileSinkCommittable("0", new FileSinkTestUtils.TestInProgressFileRecoverable()))));
        Assert.assertEquals(0L, r0.getRecoveredPendingFiles().size());
        Assert.assertEquals(1L, r0.getNumCleanUp());
        Assert.assertEquals(0L, r0.getNumberOfRetries());
    }

    @Test
    public void testCommitMultiple() throws Exception {
        StubBucketWriter stubBucketWriter = new StubBucketWriter();
        FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
        Collection collection = (Collection) Stream.of((Object[]) new FileSinkCommittable[]{new FileSinkCommittable("0", new FileSinkTestUtils.TestPendingFileRecoverable()), new FileSinkCommittable("0", new FileSinkTestUtils.TestPendingFileRecoverable()), new FileSinkCommittable("0", new FileSinkTestUtils.TestInProgressFileRecoverable()), new FileSinkCommittable("0", new FileSinkTestUtils.TestPendingFileRecoverable()), new FileSinkCommittable("0", new FileSinkTestUtils.TestInProgressFileRecoverable())}).map((v1) -> {
            return new MockCommitRequest(v1);
        }).collect(Collectors.toList());
        fileCommitter.commit(collection);
        Assert.assertEquals(3L, stubBucketWriter.getRecoveredPendingFiles().size());
        Assert.assertEquals(2L, stubBucketWriter.getNumCleanUp());
        stubBucketWriter.getRecoveredPendingFiles().forEach(recordingPendingFile -> {
            Assert.assertTrue(recordingPendingFile.isCommitted());
        });
        Assert.assertTrue(collection.stream().allMatch(commitRequest -> {
            return commitRequest.getNumberOfRetries() == 0;
        }));
    }
}
