package org.apache.flink.connector.testframe.utils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.assertj.core.api.AbstractAssert;

/* loaded from: input_file:org/apache/flink/connector/testframe/utils/CollectIteratorAssert.class */
public class CollectIteratorAssert<T> extends AbstractAssert<CollectIteratorAssert<T>, Iterator<T>> {
    private final Iterator<T> collectorIterator;
    private final List<RecordsFromSplit<T>> recordsFromSplits;
    private int totalNumRecords;
    private Integer limit;

    /* renamed from: org.apache.flink.connector.testframe.utils.CollectIteratorAssert$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/connector/testframe/utils/CollectIteratorAssert$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$CheckpointingMode = new int[CheckpointingMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$CheckpointingMode[CheckpointingMode.AT_LEAST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$CheckpointingMode[CheckpointingMode.EXACTLY_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/testframe/utils/CollectIteratorAssert$RecordsFromSplit.class */
    public static class RecordsFromSplit<T> {
        private int offset = 0;
        private final List<T> records;

        public RecordsFromSplit(List<T> list) {
            this.records = list;
        }

        public T current() {
            if (hasNext()) {
                return this.records.get(this.offset);
            }
            return null;
        }

        public void forward() {
            this.offset++;
        }

        public boolean hasNext() {
            return this.offset < this.records.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CollectIteratorAssert(Iterator<T> it) {
        super(it, CollectIteratorAssert.class);
        this.recordsFromSplits = new ArrayList();
        this.limit = null;
        this.collectorIterator = it;
    }

    public CollectIteratorAssert<T> withNumRecordsLimit(int i) {
        this.limit = Integer.valueOf(i);
        return this;
    }

    public void matchesRecordsFromSource(List<List<T>> list, CheckpointingMode checkpointingMode) {
        for (List<T> list2 : list) {
            this.recordsFromSplits.add(new RecordsFromSplit<>(list2));
            this.totalNumRecords += list2.size();
        }
        if (this.limit != null && this.limit.intValue() > this.totalNumRecords) {
            throw new IllegalArgumentException("Limit validation size should be less than total number of records from source");
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$CheckpointingMode[checkpointingMode.ordinal()]) {
            case 1:
                compareWithAtLeastOnceSemantic(this.collectorIterator, this.recordsFromSplits);
                return;
            case 2:
                compareWithExactlyOnceSemantic(this.collectorIterator, this.recordsFromSplits);
                return;
            default:
                throw new IllegalArgumentException(String.format("Unrecognized semantic \"%s\"", checkpointingMode));
        }
    }

    private void compareWithAtLeastOnceSemantic(Iterator<T> it, List<RecordsFromSplit<T>> list) {
        LinkedList linkedList = new LinkedList();
        int i = 0;
        while (it.hasNext()) {
            T next = it.next();
            if (matchThenNext(next)) {
                i++;
            } else {
                linkedList.add(next);
            }
            if (this.limit != null && i >= this.limit.intValue()) {
                break;
            }
        }
        if (this.limit != null || hasReachedEnd()) {
            confirmDuplicateRead(linkedList);
        } else {
            failWithMessage(generateMismatchDescription(String.format("Expected to have at least %d records in result, but only received %d records", Integer.valueOf(list.stream().mapToInt(recordsFromSplit -> {
                return recordsFromSplit.records.size();
            }).sum()), Integer.valueOf(i)), it), new Object[0]);
        }
    }

    private void compareWithExactlyOnceSemantic(Iterator<T> it, List<RecordsFromSplit<T>> list) {
        int i = 0;
        while (it.hasNext()) {
            T next = it.next();
            if (!matchThenNext(next)) {
                if (i >= this.totalNumRecords) {
                    failWithMessage(generateMismatchDescription(String.format("Expected to have exactly %d records in result, but received more records", Integer.valueOf(list.stream().mapToInt(recordsFromSplit -> {
                        return recordsFromSplit.records.size();
                    }).sum())), it), new Object[0]);
                } else {
                    failWithMessage(generateMismatchDescription(String.format("Unexpected record '%s' at position %d", next, Integer.valueOf(i)), it), new Object[0]);
                }
            }
            i++;
            if (this.limit != null && i >= this.limit.intValue()) {
                break;
            }
        }
        if (this.limit != null || hasReachedEnd()) {
            return;
        }
        failWithMessage(generateMismatchDescription(String.format("Expected to have exactly %d records in result, but only received %d records", Integer.valueOf(list.stream().mapToInt(recordsFromSplit2 -> {
            return recordsFromSplit2.records.size();
        }).sum()), Integer.valueOf(i)), it), new Object[0]);
    }

    private void confirmDuplicateRead(List<T> list) {
        for (T t : list) {
            boolean z = false;
            Iterator<RecordsFromSplit<T>> it = this.recordsFromSplits.iterator();
            while (true) {
                if (it.hasNext()) {
                    if (((RecordsFromSplit) it.next()).records.contains(t)) {
                        z = true;
                        break;
                    }
                } else {
                    break;
                }
            }
            if (!z) {
                failWithMessage(String.format("Unexpected duplicate record '%s'", t), new Object[0]);
            }
        }
    }

    private boolean matchThenNext(T t) {
        for (RecordsFromSplit<T> recordsFromSplit : this.recordsFromSplits) {
            if (recordsFromSplit.hasNext() && t.equals(recordsFromSplit.current())) {
                recordsFromSplit.forward();
                return true;
            }
        }
        return false;
    }

    private boolean hasReachedEnd() {
        Iterator<RecordsFromSplit<T>> it = this.recordsFromSplits.iterator();
        while (it.hasNext()) {
            if (it.next().hasNext()) {
                return false;
            }
        }
        return true;
    }

    private String generateMismatchDescription(String str, Iterator<T> it) {
        StringBuilder sb = new StringBuilder();
        sb.append(str).append("\n");
        sb.append("Current progress of multiple split test data validation:\n");
        int i = 0;
        for (RecordsFromSplit<T> recordsFromSplit : this.recordsFromSplits) {
            int i2 = i;
            i++;
            sb.append(String.format("Split %d (%d/%d): \n", Integer.valueOf(i2), Integer.valueOf(((RecordsFromSplit) recordsFromSplit).offset), Integer.valueOf(((RecordsFromSplit) recordsFromSplit).records.size())));
            for (int i3 = 0; i3 < ((RecordsFromSplit) recordsFromSplit).records.size(); i3++) {
                sb.append(((RecordsFromSplit) recordsFromSplit).records.get(i3));
                if (i3 == ((RecordsFromSplit) recordsFromSplit).offset) {
                    sb.append("\t<----");
                }
                sb.append("\n");
            }
        }
        if (it.hasNext()) {
            sb.append("Remaining received elements after the unexpected one: \n");
            while (it.hasNext()) {
                sb.append(it.next()).append("\n");
            }
        }
        return sb.toString();
    }
}
