package org.apache.flink.connector.file.table.stream.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/file/table/stream/compact/CompactCoordinatorTest.class */
class CompactCoordinatorTest extends AbstractCompactTestBase {
    CompactCoordinatorTest() {
    }

    @Test
    void testCoordinatorCrossCheckpoints() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        runCoordinator(oneInputStreamOperatorTestHarness -> {
            oneInputStreamOperatorTestHarness.setup();
            oneInputStreamOperatorTestHarness.open();
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.InputFile("p0", newFile("f0", 3)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.InputFile("p0", newFile("f1", 2)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.InputFile("p1", newFile("f2", 2)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.InputFile("p0", newFile("f3", 5)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.InputFile("p0", newFile("f4", 1)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.InputFile("p1", newFile("f5", 5)), 0L);
            oneInputStreamOperatorTestHarness.processElement(new CompactMessages.InputFile("p1", newFile("f6", 4)), 0L);
            atomicReference.set(oneInputStreamOperatorTestHarness.snapshot(1L, 0L));
        });
        runCoordinator(oneInputStreamOperatorTestHarness2 -> {
            oneInputStreamOperatorTestHarness2.setup();
            oneInputStreamOperatorTestHarness2.initializeState((OperatorSubtaskState) atomicReference.get());
            oneInputStreamOperatorTestHarness2.open();
            oneInputStreamOperatorTestHarness2.processElement(new CompactMessages.InputFile("p0", newFile("f7", 3)), 0L);
            oneInputStreamOperatorTestHarness2.processElement(new CompactMessages.InputFile("p0", newFile("f8", 2)), 0L);
            atomicReference.set(oneInputStreamOperatorTestHarness2.snapshot(2L, 0L));
        });
        runCoordinator(oneInputStreamOperatorTestHarness3 -> {
            oneInputStreamOperatorTestHarness3.setup();
            oneInputStreamOperatorTestHarness3.initializeState((OperatorSubtaskState) atomicReference.get());
            oneInputStreamOperatorTestHarness3.open();
            oneInputStreamOperatorTestHarness3.processElement(new CompactMessages.EndCheckpoint(2L, 0, 1), 0L);
            List extractOutputValues = oneInputStreamOperatorTestHarness3.extractOutputValues();
            Assertions.assertThat(extractOutputValues).hasSize(7);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 4; i++) {
                CompactMessages.CompactionUnit compactionUnit = (CompactMessages.CoordinatorOutput) extractOutputValues.get(i);
                Assertions.assertThat(compactionUnit).isInstanceOf(CompactMessages.CompactionUnit.class);
                arrayList.add(compactionUnit);
            }
            arrayList.sort(Comparator.comparing((v0) -> {
                return v0.getPartition();
            }).thenComparingInt((v0) -> {
                return v0.getUnitId();
            }));
            assertUnit((CompactMessages.CoordinatorOutput) arrayList.get(0), 0, "p0", Arrays.asList("f0", "f1", "f4"));
            assertUnit((CompactMessages.CoordinatorOutput) arrayList.get(1), 1, "p0", Collections.singletonList("f3"));
            assertUnit((CompactMessages.CoordinatorOutput) arrayList.get(2), 2, "p1", Arrays.asList("f2", "f5"));
            assertUnit((CompactMessages.CoordinatorOutput) arrayList.get(3), 3, "p1", Collections.singletonList("f6"));
            assertEndCompaction((CompactMessages.CoordinatorOutput) extractOutputValues.get(4), 1L);
            assertUnit((CompactMessages.CoordinatorOutput) extractOutputValues.get(5), 0, "p0", Arrays.asList("f7", "f8"));
            assertEndCompaction((CompactMessages.CoordinatorOutput) extractOutputValues.get(6), 2L);
        });
    }

    private void runCoordinator(ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, Exception> throwingConsumer) throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new CompactCoordinator(() -> {
            return this.folder.getFileSystem();
        }, 9L));
        Throwable th = null;
        try {
            throwingConsumer.accept(oneInputStreamOperatorTestHarness);
            if (oneInputStreamOperatorTestHarness != null) {
                if (0 == 0) {
                    oneInputStreamOperatorTestHarness.close();
                    return;
                }
                try {
                    oneInputStreamOperatorTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (oneInputStreamOperatorTestHarness != null) {
                if (0 != 0) {
                    try {
                        oneInputStreamOperatorTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    oneInputStreamOperatorTestHarness.close();
                }
            }
            throw th3;
        }
    }

    private void assertEndCompaction(CompactMessages.CoordinatorOutput coordinatorOutput, long j) {
        Assertions.assertThat(coordinatorOutput).isInstanceOf(CompactMessages.EndCompaction.class);
        Assertions.assertThat(((CompactMessages.EndCompaction) coordinatorOutput).getCheckpointId()).isEqualTo(j);
    }

    private void assertUnit(CompactMessages.CoordinatorOutput coordinatorOutput, int i, String str, List<String> list) {
        Assertions.assertThat(coordinatorOutput).isInstanceOf(CompactMessages.CompactionUnit.class);
        CompactMessages.CompactionUnit compactionUnit = (CompactMessages.CompactionUnit) coordinatorOutput;
        Assertions.assertThat(compactionUnit.getUnitId()).isEqualTo(i);
        Assertions.assertThat(compactionUnit.getPartition()).isEqualTo(str);
        Assertions.assertThat(compactionUnit.getPaths().stream().map((v0) -> {
            return v0.getName();
        })).containsExactlyElementsOf(list);
    }
}
