/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.testutils;

import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.SuccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ValidatingExactlyOnceSink
extends RichSinkFunction<Integer>
implements ListCheckpointed<Tuple2<Integer, BitSet>> {
    private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
    private static final long serialVersionUID = 1748426382527469932L;
    private final int numElementsTotal;
    private BitSet duplicateChecker = new BitSet();
    private int numElements;

    public ValidatingExactlyOnceSink(int numElementsTotal) {
        this.numElementsTotal = numElementsTotal;
    }

    public void invoke(Integer value) throws Exception {
        ++this.numElements;
        if (this.duplicateChecker.get(value)) {
            throw new Exception("Received a duplicate: " + value);
        }
        this.duplicateChecker.set(value);
        if (this.numElements == this.numElementsTotal) {
            if (this.duplicateChecker.cardinality() != this.numElementsTotal) {
                throw new Exception("Duplicate checker has wrong cardinality");
            }
            if (this.duplicateChecker.nextClearBit(0) != this.numElementsTotal) {
                throw new Exception("Received sparse sequence");
            }
            throw new SuccessException();
        }
    }

    public List<Tuple2<Integer, BitSet>> snapshotState(long checkpointId, long timestamp) throws Exception {
        LOG.info("Snapshot of counter " + this.numElements + " at checkpoint " + checkpointId);
        return Collections.singletonList(new Tuple2((Object)this.numElements, (Object)this.duplicateChecker));
    }

    public void restoreState(List<Tuple2<Integer, BitSet>> state) throws Exception {
        if (state.isEmpty() || state.size() > 1) {
            throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
        }
        Tuple2<Integer, BitSet> s = state.get(0);
        LOG.info("restoring num elements to {}", s.f0);
        this.numElements = (Integer)s.f0;
        this.duplicateChecker = (BitSet)s.f1;
    }
}

