package org.apache.flink.test.checkpointing.utils;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.SuccessException;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/checkpointing/utils/ValidatingSink.class */
public class ValidatingSink<T> extends RichSinkFunction<T> implements ListCheckpointed<HashMap<Long, Integer>> {

    @Nonnull
    private final ResultChecker resultChecker;

    @Nonnull
    private final CountUpdater<T> countUpdater;

    @Nonnull
    private final HashMap<Long, Integer> windowCounts;
    private final boolean usingProcessingTime;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/ValidatingSink$CountUpdater.class */
    public interface CountUpdater<T> extends Serializable {
        void updateCount(T t, Map<Long, Integer> map);
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/ValidatingSink$ResultChecker.class */
    public interface ResultChecker extends Serializable {
        boolean checkResult(Map<Long, Integer> map);
    }

    public ValidatingSink(@Nonnull CountUpdater<T> countUpdater, @Nonnull ResultChecker resultChecker) {
        this(countUpdater, resultChecker, TimeCharacteristic.EventTime);
    }

    public ValidatingSink(@Nonnull CountUpdater<T> countUpdater, @Nonnull ResultChecker resultChecker, @Nonnull TimeCharacteristic timeCharacteristic) {
        this.resultChecker = resultChecker;
        this.countUpdater = countUpdater;
        this.usingProcessingTime = TimeCharacteristic.ProcessingTime == timeCharacteristic;
        this.windowCounts = new HashMap<>();
    }

    public void open(Configuration configuration) throws Exception {
        Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
        if (this.usingProcessingTime && this.resultChecker.checkResult(this.windowCounts)) {
            throw new SuccessException();
        }
    }

    public void close() {
        if (!this.resultChecker.checkResult(this.windowCounts)) {
            throw new AssertionError("Test failed check.");
        }
        if (this.usingProcessingTime) {
            throw new SuccessException();
        }
    }

    public void invoke(T t, SinkFunction.Context context) throws Exception {
        this.countUpdater.updateCount(t, this.windowCounts);
        if (this.usingProcessingTime && this.resultChecker.checkResult(this.windowCounts)) {
            throw new SuccessException();
        }
    }

    public List<HashMap<Long, Integer>> snapshotState(long j, long j2) throws Exception {
        return Collections.singletonList(this.windowCounts);
    }

    public void restoreState(List<HashMap<Long, Integer>> list) throws Exception {
        if (list.isEmpty() || list.size() > 1) {
            throw new RuntimeException("Test failed due to unexpected recovered state size " + list.size());
        }
        this.windowCounts.putAll(list.get(0));
    }
}
