package org.apache.flink.statefun.flink.core.feedback;

import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Ignore;
import org.junit.Test;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/FeedbackChannelTest.class */
public class FeedbackChannelTest {
    private static final SubtaskFeedbackKey<String> KEY = new FeedbackKey("foo", 1).withSubTaskIndex(0, 1);

    @Test
    public void exampleUsage() {
        FeedbackChannel feedbackChannel = new FeedbackChannel(KEY, new LockFreeBatchFeedbackQueue());
        feedbackChannel.put("hello");
        feedbackChannel.put("world");
        feedbackChannel.close();
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        feedbackChannel.registerConsumer((v1) -> {
            r1.add(v1);
        }, (v0) -> {
            v0.run();
        });
        MatcherAssert.assertThat(arrayList, Matchers.contains(new String[]{"hello", "world"}));
    }

    @Test
    @Ignore("benchmarks are not run as part of a regular test suite.")
    public void launchBenchmark() throws Exception {
        new Runner(new OptionsBuilder().include(getClass().getName() + ".*").timeUnit(TimeUnit.MILLISECONDS).warmupIterations(4).measurementIterations(4).threads(2).forks(1).shouldFailOnError(true).shouldDoGC(true).build()).run();
    }

    @Benchmark
    @BenchmarkMode({Mode.AverageTime})
    public void lockFreeBatchHandOffQueue(Blackhole blackhole) {
        blackhole.consume(benchmark(new LockFreeBatchFeedbackQueue(), 1000000));
    }

    private static int benchmark(FeedbackQueue<String> feedbackQueue, int i) {
        FeedbackChannel feedbackChannel = new FeedbackChannel(KEY, feedbackQueue);
        int[] iArr = new int[1];
        new Object();
        feedbackChannel.registerConsumer(str -> {
            iArr[0] = iArr[0] + 1;
        }, Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            return thread;
        }));
        for (int i2 = 0; i2 < i; i2++) {
            feedbackChannel.put("hello");
        }
        feedbackChannel.close();
        return iArr[0];
    }
}
