package org.apache.flink.test.streaming.runtime;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricAssertions;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.class */
public class SinkV2MetricsITCase extends TestLogger {
    private static final String TEST_SINK_NAME = "MetricTestSink";
    private static final String DEFAULT_WRITER_NAME = "Writer";
    private static final String DEFAULT_COMMITTER_NAME = "Committer";
    private static final int DEFAULT_PARALLELISM = 4;

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).setConfiguration(this.reporter.addToConfiguration(new Configuration())).build());

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase$MetricCommitter.class */
    private static class MetricCommitter extends TestSinkV2.DefaultCommitter {
        private int counter;
        private SharedReference<CountDownLatch> beforeLatch;
        private SharedReference<CountDownLatch> afterLatch;

        MetricCommitter(SharedReference<CountDownLatch> sharedReference, SharedReference<CountDownLatch> sharedReference2) {
            this.counter = 0;
            this.beforeLatch = sharedReference;
            this.afterLatch = sharedReference2;
            this.counter = 0;
        }

        public void commit(Collection<Committer.CommitRequest<String>> collection) {
            if (this.counter == 0) {
                System.err.println("Committables arrived " + Thread.currentThread().getName() + " " + collection.stream().map(commitRequest -> {
                    return (String) commitRequest.getCommittable();
                }).collect(Collectors.toList()));
                collection.forEach(commitRequest2 -> {
                    commitRequest2.retryLater();
                });
            } else {
                if (this.counter == 1) {
                    try {
                        collection.forEach(commitRequest3 -> {
                            ((CountDownLatch) this.beforeLatch.get()).countDown();
                        });
                        ((CountDownLatch) this.afterLatch.get()).await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
                collection.forEach(commitRequest4 -> {
                    switch (((String) commitRequest4.getCommittable()).charAt(1)) {
                        case '0':
                            commitRequest4.signalAlreadyCommitted();
                            return;
                        case '1':
                        case '2':
                            commitRequest4.signalFailedWithKnownReason(new RuntimeException());
                            return;
                        case '3':
                            if (this.counter == 1) {
                                commitRequest4.retryLater();
                                return;
                            }
                            return;
                        case '4':
                        case '5':
                            commitRequest4.updateAndRetryLater("Retry-" + ((String) commitRequest4.getCommittable()));
                            return;
                        default:
                            return;
                    }
                });
            }
            this.counter++;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase$MetricWriter.class */
    private static class MetricWriter extends TestSinkV2.DefaultSinkWriter<Long> {
        static final long BASE_SEND_TIME = 100;
        static final long RECORD_SIZE_IN_BYTES = 10;
        private SinkWriterMetricGroup metricGroup;
        private long sendTime;

        private MetricWriter() {
        }

        public void init(Sink.InitContext initContext) {
            this.metricGroup = initContext.metricGroup();
            this.metricGroup.setCurrentSendTimeGauge(() -> {
                return Long.valueOf(this.sendTime);
            });
        }

        public void write(Long l, SinkWriter.Context context) {
            super.write(l, context);
            this.sendTime = l.longValue() * BASE_SEND_TIME;
            this.metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
            if (l.longValue() % 2 == 0) {
                this.metricGroup.getNumRecordsOutErrorsCounter().inc();
            }
            this.metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
        }
    }

    @Test
    public void testMetrics() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        int max = Math.max(1, executionEnvironment.getParallelism() - 2);
        int i = 10;
        SharedReference add = this.sharedObjects.add(new CyclicBarrier(max + 1));
        SharedReference add2 = this.sharedObjects.add(new CyclicBarrier(max + 1));
        int i2 = DEFAULT_PARALLELISM;
        int i3 = 10 - 1;
        executionEnvironment.fromSequence(0L, max - 1).flatMap((l, collector) -> {
            LongStream range = LongStream.range(0L, i);
            collector.getClass();
            range.forEach((v1) -> {
                r1.collect(v1);
            });
        }).returns(BasicTypeInfo.LONG_TYPE_INFO).map(l2 -> {
            if (l2.longValue() % i == i2 || l2.longValue() % i == i3) {
                ((CyclicBarrier) add.get()).await();
                ((CyclicBarrier) add2.get()).await();
            }
            return l2;
        }).sinkTo(TestSinkV2.newBuilder().setWriter(new MetricWriter()).build()).name(TEST_SINK_NAME);
        JobClient executeAsync = executionEnvironment.executeAsync();
        JobID jobID = executeAsync.getJobID();
        ((CyclicBarrier) add.get()).await();
        assertSinkMetrics(jobID, DEFAULT_PARALLELISM, max);
        ((CyclicBarrier) add2.get()).await();
        ((CyclicBarrier) add.get()).await();
        assertSinkMetrics(jobID, i3, max);
        ((CyclicBarrier) add2.get()).await();
        executeAsync.getJobExecutionResult().get();
    }

    @Test
    public void testCommitterMetrics() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SharedReference add = this.sharedObjects.add(new CountDownLatch(7));
        SharedReference add2 = this.sharedObjects.add(new CountDownLatch(1));
        executionEnvironment.fromSequence(0L, 6L).returns(BasicTypeInfo.LONG_TYPE_INFO).sinkTo(TestSinkV2.newBuilder().setCommitter(new MetricCommitter(add, add2)).setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE).build()).name(TEST_SINK_NAME);
        JobClient executeAsync = executionEnvironment.executeAsync();
        JobID jobID = executeAsync.getJobID();
        ((CountDownLatch) add.get()).await();
        assertSinkCommitterMetrics(jobID, ImmutableMap.of("alreadyCommittedCommittables", 0L, "failedCommittables", 0L, "retriedCommittables", 7L, "successfulCommittables", 0L, "totalCommittables", 7L, "pendingCommittables", 7L));
        ((CountDownLatch) add2.get()).countDown();
        executeAsync.getJobExecutionResult().get();
        assertSinkCommitterMetrics(jobID, ImmutableMap.of("alreadyCommittedCommittables", 1L, "failedCommittables", 2L, "retriedCommittables", 10L, "successfulCommittables", 4L, "totalCommittables", 7L, "pendingCommittables", 0L));
    }

    private void assertSinkMetrics(JobID jobID, long j, int i) {
        int i2 = 0;
        for (OperatorMetricGroup operatorMetricGroup : this.reporter.findOperatorMetricGroups(jobID, "MetricTestSink: Writer")) {
            Map metricsByGroup = this.reporter.getMetricsByGroup(operatorMetricGroup);
            if (operatorMetricGroup.getIOMetricGroup() != null && operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter() != null && operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter().getCount() != 0) {
                i2++;
                MetricAssertions.assertThatCounter((Metric) metricsByGroup.get("numRecordsOut")).isEqualTo(Long.valueOf(j));
                MetricAssertions.assertThatCounter((Metric) metricsByGroup.get("numBytesOut")).isEqualTo(Long.valueOf(j * 10));
                MetricAssertions.assertThatCounter((Metric) metricsByGroup.get("numRecordsOutErrors")).isEqualTo(Long.valueOf((j + 1) / 2));
                MetricAssertions.assertThatCounter((Metric) metricsByGroup.get("numRecordsSend")).isEqualTo(Long.valueOf(j));
                MetricAssertions.assertThatCounter((Metric) metricsByGroup.get("numBytesSend")).isEqualTo(Long.valueOf(j * 10));
                MetricAssertions.assertThatCounter((Metric) metricsByGroup.get("numRecordsSendErrors")).isEqualTo(Long.valueOf((j + 1) / 2));
                MetricAssertions.assertThatGauge((Metric) metricsByGroup.get("currentSendTime")).isEqualTo(Long.valueOf((j - 1) * 100));
            }
        }
        MatcherAssert.assertThat(Integer.valueOf(i2), CoreMatchers.equalTo(Integer.valueOf(i)));
    }

    private void assertSinkCommitterMetrics(JobID jobID, Map<String, Long> map) {
        List findOperatorMetricGroups = this.reporter.findOperatorMetricGroups(jobID, "MetricTestSink: Committer");
        HashMap hashMap = new HashMap(6);
        Iterator it = findOperatorMetricGroups.iterator();
        while (it.hasNext()) {
            Map metricsByGroup = this.reporter.getMetricsByGroup((OperatorMetricGroup) it.next());
            for (String str : Arrays.asList("successfulCommittables", "alreadyCommittedCommittables", "retriedCommittables", "failedCommittables", "totalCommittables")) {
                Counter counter = (Counter) metricsByGroup.get(str);
                if (counter != null) {
                    hashMap.merge(str, Long.valueOf(counter.getCount()), (v0, v1) -> {
                        return Long.sum(v0, v1);
                    });
                }
            }
            Gauge gauge = (Gauge) metricsByGroup.get("pendingCommittables");
            if (gauge != null && gauge.getValue() != null) {
                hashMap.merge("pendingCommittables", Long.valueOf(((Integer) gauge.getValue()).longValue()), (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
            }
        }
        map.entrySet().forEach(entry -> {
            MatcherAssert.assertThat(hashMap, Matchers.hasEntry(entry.getKey(), entry.getValue()));
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -156498494:
                if (implMethodName.equals("lambda$testMetrics$391a8f34$1")) {
                    z = true;
                    break;
                }
                break;
            case 1604793520:
                if (implMethodName.equals("lambda$testMetrics$38c93b09$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase") && serializedLambda.getImplMethodSignature().equals("(IIILorg/apache/flink/testutils/junit/SharedReference;Lorg/apache/flink/testutils/junit/SharedReference;Ljava/lang/Long;)Ljava/lang/Long;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    int intValue3 = ((Integer) serializedLambda.getCapturedArg(2)).intValue();
                    SharedReference sharedReference = (SharedReference) serializedLambda.getCapturedArg(3);
                    SharedReference sharedReference2 = (SharedReference) serializedLambda.getCapturedArg(DEFAULT_PARALLELISM);
                    return l2 -> {
                        if (l2.longValue() % intValue == intValue2 || l2.longValue() % intValue == intValue3) {
                            ((CyclicBarrier) sharedReference.get()).await();
                            ((CyclicBarrier) sharedReference2.get()).await();
                        }
                        return l2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("flatMap") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase") && serializedLambda.getImplMethodSignature().equals("(ILjava/lang/Long;Lorg/apache/flink/util/Collector;)V")) {
                    int intValue4 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return (l, collector) -> {
                        LongStream range = LongStream.range(0L, intValue4);
                        collector.getClass();
                        range.forEach((v1) -> {
                            r1.collect(v1);
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
