package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/source/FairAssignModeTest.class */
public class FairAssignModeTest extends StaticFileStoreSplitEnumeratorTestBase {
    @Test
    public void testSplitAllocation() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext = getSplitEnumeratorContext(2);
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 4; i++) {
            arrayList.add(createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        StaticFileStoreSplitEnumerator splitEnumerator = getSplitEnumerator(splitEnumeratorContext, arrayList);
        splitEnumerator.handleSplitRequest(0, "test-host");
        splitEnumerator.handleSplitRequest(1, "test-host");
        Map splitAssignments = splitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{arrayList.get(0), arrayList.get(2)});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(1)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{arrayList.get(1), arrayList.get(3)});
        splitEnumerator.addSplitsBack(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits(), 0);
        splitEnumeratorContext.getSplitAssignments().clear();
        Assertions.assertThat(splitEnumeratorContext.getSplitAssignments()).isEmpty();
        splitEnumerator.handleSplitRequest(0, "test-host");
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{arrayList.get(0), arrayList.get(2)});
    }

    @Test
    public void testSplitBatch() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext = getSplitEnumeratorContext(2);
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 28; i++) {
            arrayList.add(createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        StaticFileStoreSplitEnumerator splitEnumerator = getSplitEnumerator(splitEnumeratorContext, arrayList);
        splitEnumerator.handleSplitRequest(0, "test-host");
        splitEnumerator.handleSplitRequest(1, "test-host");
        Map splitAssignments = splitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).hasSize(10);
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(1)).getAssignedSplits()).hasSize(10);
        splitEnumerator.handleSplitRequest(0, "test-host");
        splitEnumerator.handleSplitRequest(1, "test-host");
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).hasSize(14);
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(1)).getAssignedSplits()).hasSize(14);
        splitEnumerator.handleSplitRequest(0, "test-host");
        splitEnumerator.handleSplitRequest(1, "test-host");
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).hasReceivedNoMoreSplitsSignal()).isEqualTo(true);
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(1)).hasReceivedNoMoreSplitsSignal()).isEqualTo(true);
    }

    @Test
    public void testSplitAllocationNotEvenly() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext = getSplitEnumeratorContext(2);
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 3; i++) {
            arrayList.add(createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        StaticFileStoreSplitEnumerator splitEnumerator = getSplitEnumerator(splitEnumeratorContext, arrayList);
        splitEnumerator.handleSplitRequest(0, "test-host");
        splitEnumerator.handleSplitRequest(1, "test-host");
        Map splitAssignments = splitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(1)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{arrayList.get(0), arrayList.get(2)});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{arrayList.get(1)});
    }

    @Test
    public void testSplitAllocationSomeEmpty() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext = getSplitEnumeratorContext(3);
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 2; i++) {
            arrayList.add(createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        StaticFileStoreSplitEnumerator splitEnumerator = getSplitEnumerator(splitEnumeratorContext, arrayList);
        splitEnumerator.handleSplitRequest(0, "test-host");
        splitEnumerator.handleSplitRequest(1, "test-host");
        splitEnumerator.handleSplitRequest(2, "test-host");
        Map splitAssignments = splitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0, 1, 2});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{arrayList.get(0)});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(1)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{arrayList.get(1)});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(2)).getAssignedSplits()).isEmpty();
    }

    @Override // org.apache.paimon.flink.source.StaticFileStoreSplitEnumeratorTestBase
    protected FlinkConnectorOptions.SplitAssignMode splitAssignMode() {
        return FlinkConnectorOptions.SplitAssignMode.FAIR;
    }
}
