package org.apache.flink.connector.file.table.stream.compact;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest.class */
class CompactOperatorTest extends AbstractCompactTestBase {
    CompactOperatorTest() {
    }

    @Test
    void testCompactOperator() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        Path newFile = newFile(".uncompacted-f0", 3);
        Path newFile2 = newFile(".uncompacted-f1", 2);
        Path newFile3 = newFile(".uncompacted-f2", 2);
        Path newFile4 = newFile(".uncompacted-f3", 5);
        Path newFile5 = newFile(".uncompacted-f4", 1);
        Path newFile6 = newFile(".uncompacted-f5", 5);
        Path newFile7 = newFile(".uncompacted-f6", 4);
        FileSystem fileSystem = newFile.getFileSystem();
        runCompact(oneInputStreamOperatorTestHarness -> {
            oneInputStreamOperatorTestHarness.setup();
            oneInputStreamOperatorTestHarness.open();
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.CompactionUnit(0, "p0", Arrays.asList(newFile, newFile2, newFile5)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.CompactionUnit(1, "p0", Collections.singletonList(newFile4)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.CompactionUnit(2, "p1", Arrays.asList(newFile3, newFile6)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.CompactionUnit(3, "p0", Collections.singletonList(newFile7)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.EndCompaction(1L), 0L);
            atomicReference.set(oneInputStreamOperatorTestHarness.snapshot(2L, 0L));
            List extractOutputValues = oneInputStreamOperatorTestHarness.extractOutputValues();
            Assertions.assertThat(extractOutputValues).hasSize(1);
            Assertions.assertThat(((PartitionCommitInfo) extractOutputValues.get(0)).getCheckpointId()).isEqualTo(1L);
            Assertions.assertThat(((PartitionCommitInfo) extractOutputValues.get(0)).getPartitions()).isEqualTo(new String[]{"p0", "p1"});
            Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f0"))).isTrue();
            Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f2"))).isTrue();
            Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f3"))).isTrue();
            Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f6"))).isTrue();
            byte[] readAllBytes = FileUtils.readAllBytes(new File(this.folder.getPath(), "compacted-f0").toPath());
            Arrays.sort(readAllBytes);
            Assertions.assertThat(readAllBytes).isEqualTo(new byte[]{0, 0, 0, 1, 1, 2});
        });
        runCompact(oneInputStreamOperatorTestHarness2 -> {
            oneInputStreamOperatorTestHarness2.setup();
            oneInputStreamOperatorTestHarness2.initializeState((OperatorSubtaskState) atomicReference.get());
            oneInputStreamOperatorTestHarness2.open();
            oneInputStreamOperatorTestHarness2.notifyOfCompletedCheckpoint(2L);
            Assertions.assertThat(fileSystem.exists(newFile)).isFalse();
            Assertions.assertThat(fileSystem.exists(newFile2)).isFalse();
            Assertions.assertThat(fileSystem.exists(newFile3)).isFalse();
            Assertions.assertThat(fileSystem.exists(newFile4)).isFalse();
            Assertions.assertThat(fileSystem.exists(newFile5)).isFalse();
            Assertions.assertThat(fileSystem.exists(newFile6)).isFalse();
            Assertions.assertThat(fileSystem.exists(newFile7)).isFalse();
        });
    }

    @Test
    void testEndInput() throws Exception {
        Path newFile = newFile(".uncompacted-f0", 3);
        Path newFile2 = newFile(".uncompacted-f1", 4);
        Path newFile3 = newFile(".uncompacted-f2", 2);
        FileSystem fileSystem = newFile.getFileSystem();
        runCompact(oneInputStreamOperatorTestHarness -> {
            oneInputStreamOperatorTestHarness.setup();
            oneInputStreamOperatorTestHarness.open();
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.CompactionUnit(0, "p0", Arrays.asList(newFile, newFile2)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.CompactionUnit(1, "p0", Collections.singletonList(newFile3)), 0L);
            oneInputStreamOperatorTestHarness.endInput();
            Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f0"))).isTrue();
            Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f2"))).isTrue();
            Assertions.assertThat(fileSystem.exists(newFile)).isFalse();
            Assertions.assertThat(fileSystem.exists(newFile2)).isFalse();
            Assertions.assertThat(fileSystem.exists(newFile3)).isFalse();
        });
    }

    @Test
    void testUnitSelection() throws Exception {
        OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo> create = create(2, 0);
        create.setup();
        create.open();
        OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo> create2 = create(2, 1);
        create2.setup();
        create2.open();
        Path newFile = newFile(".uncompacted-f0", 3);
        Path newFile2 = newFile(".uncompacted-f1", 2);
        Path newFile3 = newFile(".uncompacted-f2", 2);
        Path newFile4 = newFile(".uncompacted-f3", 5);
        Path newFile5 = newFile(".uncompacted-f4", 1);
        Path newFile6 = newFile(".uncompacted-f5", 5);
        Path newFile7 = newFile(".uncompacted-f6", 4);
        FileSystem fileSystem = newFile.getFileSystem();
        create.processElement(new CompactMessages.CompactionUnit(0, "p0", Arrays.asList(newFile, newFile2, newFile5)), 0L);
        create.processElement(new CompactMessages.CompactionUnit(1, "p0", Collections.singletonList(newFile4)), 0L);
        create.processElement(new CompactMessages.CompactionUnit(2, "p0", Arrays.asList(newFile3, newFile6)), 0L);
        create.processElement(new CompactMessages.CompactionUnit(3, "p0", Collections.singletonList(newFile7)), 0L);
        create.processElement(new CompactMessages.EndCompaction(1L), 0L);
        Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f0"))).isTrue();
        Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f2"))).isTrue();
        Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f3"))).isFalse();
        Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f6"))).isFalse();
        create2.processElement(new CompactMessages.CompactionUnit(0, "p0", Arrays.asList(newFile, newFile2, newFile5)), 0L);
        create2.processElement(new CompactMessages.CompactionUnit(1, "p0", Collections.singletonList(newFile4)), 0L);
        create2.processElement(new CompactMessages.CompactionUnit(2, "p0", Arrays.asList(newFile3, newFile6)), 0L);
        create2.processElement(new CompactMessages.CompactionUnit(3, "p0", Collections.singletonList(newFile7)), 0L);
        create2.processElement(new CompactMessages.EndCompaction(1L), 0L);
        Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f3"))).isTrue();
        Assertions.assertThat(fileSystem.exists(new Path(this.folder, "compacted-f6"))).isTrue();
        create.close();
        create2.close();
    }

    private void runCompact(ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo>, Exception> throwingConsumer) throws Exception {
        OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo> create = create(1, 0);
        Throwable th = null;
        try {
            try {
                throwingConsumer.accept(create);
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo> create(int i, int i2) throws Exception {
        return new OneInputStreamOperatorTestHarness<>(new CompactOperator(() -> {
            return this.folder.getFileSystem();
        }, CompactBulkReader.factory(TestByteFormat.bulkFormat()), compactContext -> {
            final Path path = compactContext.getPath();
            final Path path2 = new Path(path.getParent(), "." + path.getName());
            final FSDataOutputStream create = compactContext.getFileSystem().create(path2, FileSystem.WriteMode.OVERWRITE);
            return new CompactWriter<Byte>() { // from class: org.apache.flink.connector.file.table.stream.compact.CompactOperatorTest.1
                public void write(Byte b) throws IOException {
                    create.write(b.byteValue());
                }

                public void commit() throws IOException {
                    create.close();
                    compactContext.getFileSystem().rename(path2, path);
                }
            };
        }), i, i, i2);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -640914513:
                if (implMethodName.equals("lambda$create$805c20d7$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/file/table/stream/compact/CompactWriter$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/connector/file/table/stream/compact/CompactContext;)Lorg/apache/flink/connector/file/table/stream/compact/CompactWriter;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/connector/file/table/stream/compact/CompactContext;)Lorg/apache/flink/connector/file/table/stream/compact/CompactWriter;")) {
                    CompactOperatorTest compactOperatorTest = (CompactOperatorTest) serializedLambda.getCapturedArg(0);
                    return compactContext -> {
                        final Path path = compactContext.getPath();
                        final Path path2 = new Path(path.getParent(), "." + path.getName());
                        final FSDataOutputStream create = compactContext.getFileSystem().create(path2, FileSystem.WriteMode.OVERWRITE);
                        return new CompactWriter<Byte>() { // from class: org.apache.flink.connector.file.table.stream.compact.CompactOperatorTest.1
                            public void write(Byte b) throws IOException {
                                create.write(b.byteValue());
                            }

                            public void commit() throws IOException {
                                create.close();
                                compactContext.getFileSystem().rename(path2, path);
                            }
                        };
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
