package org.apache.flink.streaming.connectors.kafka.testutils;

import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.SuccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.class */
public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements ListCheckpointed<Tuple2<Integer, BitSet>> {
    private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
    private static final long serialVersionUID = 1748426382527469932L;
    private final int numElementsTotal;
    private BitSet duplicateChecker = new BitSet();
    private int numElements;

    public ValidatingExactlyOnceSink(int i) {
        this.numElementsTotal = i;
    }

    public void invoke(Integer num) throws Exception {
        this.numElements++;
        if (this.duplicateChecker.get(num.intValue())) {
            throw new Exception("Received a duplicate: " + num);
        }
        this.duplicateChecker.set(num.intValue());
        if (this.numElements == this.numElementsTotal) {
            if (this.duplicateChecker.cardinality() != this.numElementsTotal) {
                throw new Exception("Duplicate checker has wrong cardinality");
            }
            if (this.duplicateChecker.nextClearBit(0) == this.numElementsTotal) {
                throw new SuccessException();
            }
            throw new Exception("Received sparse sequence");
        }
    }

    public List<Tuple2<Integer, BitSet>> snapshotState(long j, long j2) throws Exception {
        LOG.info("Snapshot of counter " + this.numElements + " at checkpoint " + j);
        return Collections.singletonList(new Tuple2(Integer.valueOf(this.numElements), this.duplicateChecker));
    }

    public void restoreState(List<Tuple2<Integer, BitSet>> list) throws Exception {
        if (list.isEmpty() || list.size() > 1) {
            throw new RuntimeException("Test failed due to unexpected recovered state size " + list.size());
        }
        Tuple2<Integer, BitSet> tuple2 = list.get(0);
        LOG.info("restoring num elements to {}", tuple2.f0);
        this.numElements = ((Integer) tuple2.f0).intValue();
        this.duplicateChecker = (BitSet) tuple2.f1;
    }
}
