/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem.stream.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.filesystem.stream.compact.AbstractCompactTestBase;
import org.apache.flink.table.filesystem.stream.compact.CompactCoordinator;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages;
import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.Assert;
import org.junit.Test;

public class CompactCoordinatorTest
extends AbstractCompactTestBase {
    @Test
    public void testCoordinatorCrossCheckpoints() throws Exception {
        AtomicReference state = new AtomicReference();
        this.runCoordinator((ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, Exception>)((ThrowingConsumer)harness -> {
            harness.setup();
            harness.open();
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f0", 3)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f1", 2)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p1", this.newFile("f2", 2)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f3", 5)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f4", 1)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p1", this.newFile("f5", 5)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p1", this.newFile("f6", 4)), 0L);
            state.set(harness.snapshot(1L, 0L));
        }));
        this.runCoordinator((ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, Exception>)((ThrowingConsumer)harness -> {
            harness.setup();
            harness.initializeState((OperatorSubtaskState)state.get());
            harness.open();
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f7", 3)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f8", 2)), 0L);
            state.set(harness.snapshot(2L, 0L));
        }));
        this.runCoordinator((ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, Exception>)((ThrowingConsumer)harness -> {
            harness.setup();
            harness.initializeState((OperatorSubtaskState)state.get());
            harness.open();
            harness.processElement((Object)new CompactMessages.EndCheckpoint(2L, 0, 1), 0L);
            List outputs = harness.extractOutputValues();
            Assert.assertEquals((long)7L, (long)outputs.size());
            ArrayList<CompactMessages.CompactionUnit> cp1Units = new ArrayList<CompactMessages.CompactionUnit>();
            for (int i = 0; i < 4; ++i) {
                CompactMessages.CoordinatorOutput output = (CompactMessages.CoordinatorOutput)outputs.get(i);
                Assert.assertTrue((boolean)(output instanceof CompactMessages.CompactionUnit));
                cp1Units.add((CompactMessages.CompactionUnit)output);
            }
            cp1Units.sort(Comparator.comparing(CompactMessages.CompactionUnit::getPartition).thenComparingInt(CompactMessages.CompactionUnit::getUnitId));
            this.assertUnit((CompactMessages.CoordinatorOutput)cp1Units.get(0), 0, "p0", Arrays.asList("f0", "f1", "f4"));
            this.assertUnit((CompactMessages.CoordinatorOutput)cp1Units.get(1), 1, "p0", Collections.singletonList("f3"));
            this.assertUnit((CompactMessages.CoordinatorOutput)cp1Units.get(2), 2, "p1", Arrays.asList("f2", "f5"));
            this.assertUnit((CompactMessages.CoordinatorOutput)cp1Units.get(3), 3, "p1", Collections.singletonList("f6"));
            this.assertEndCompaction((CompactMessages.CoordinatorOutput)outputs.get(4), 1L);
            this.assertUnit((CompactMessages.CoordinatorOutput)outputs.get(5), 0, "p0", Arrays.asList("f7", "f8"));
            this.assertEndCompaction((CompactMessages.CoordinatorOutput)outputs.get(6), 2L);
        }));
    }

    private void runCoordinator(ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, Exception> consumer) throws Exception {
        CompactCoordinator coordinator = new CompactCoordinator(() -> this.folder.getFileSystem(), 9L);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            consumer.accept((Object)harness);
        }
    }

    private void assertEndCompaction(CompactMessages.CoordinatorOutput output, long checkpointId) {
        Assert.assertTrue((boolean)(output instanceof CompactMessages.EndCompaction));
        CompactMessages.EndCompaction end = (CompactMessages.EndCompaction)output;
        Assert.assertEquals((long)checkpointId, (long)end.getCheckpointId());
    }

    private void assertUnit(CompactMessages.CoordinatorOutput output, int unitId, String partition, List<String> fileNames) {
        Assert.assertTrue((boolean)(output instanceof CompactMessages.CompactionUnit));
        CompactMessages.CompactionUnit unit = (CompactMessages.CompactionUnit)output;
        Assert.assertEquals((long)unitId, (long)unit.getUnitId());
        Assert.assertEquals((Object)partition, (Object)unit.getPartition());
        Assert.assertEquals(fileNames, unit.getPaths().stream().map(Path::getName).collect(Collectors.toList()));
    }
}

