package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.class */
public class InternalSourceReaderMetricGroup extends ProxyMetricGroup<MetricGroup> implements SourceReaderMetricGroup {
    public static final long UNDEFINED = -1;
    private static final long ACTIVE = Long.MAX_VALUE;
    private static final long MAX_WATERMARK_TIMESTAMP = Watermark.MAX_WATERMARK.getTimestamp();
    private final OperatorIOMetricGroup operatorIOMetricGroup;
    private final Counter numRecordsInErrors;
    private final Clock clock;
    private long lastWatermark;
    private long lastEventTime;
    private long idleStartTime;
    private boolean firstWatermark;
    private long currentMaxDesiredWatermark;
    private boolean firstDesiredWatermark;

    private InternalSourceReaderMetricGroup(MetricGroup metricGroup, OperatorIOMetricGroup operatorIOMetricGroup, Clock clock) {
        super(metricGroup);
        this.lastEventTime = Long.MIN_VALUE;
        this.idleStartTime = Long.MAX_VALUE;
        this.firstWatermark = true;
        this.firstDesiredWatermark = true;
        this.numRecordsInErrors = metricGroup.counter(MetricNames.NUM_RECORDS_IN_ERRORS);
        this.operatorIOMetricGroup = operatorIOMetricGroup;
        this.clock = clock;
        metricGroup.gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime);
        metricGroup.gauge(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, this::getEmitTimeLag);
    }

    public static InternalSourceReaderMetricGroup wrap(OperatorMetricGroup operatorMetricGroup) {
        return new InternalSourceReaderMetricGroup(operatorMetricGroup, operatorMetricGroup.getIOMetricGroup(), SystemClock.getInstance());
    }

    @VisibleForTesting
    public static InternalSourceReaderMetricGroup mock(MetricGroup metricGroup) {
        return new InternalSourceReaderMetricGroup(metricGroup, UnregisteredMetricsGroup.createOperatorIOMetricGroup(), SystemClock.getInstance());
    }

    public Counter getNumRecordsInErrorsCounter() {
        return this.numRecordsInErrors;
    }

    public void setPendingBytesGauge(Gauge<Long> gauge) {
        gauge(MetricNames.PENDING_BYTES, gauge);
    }

    public void setPendingRecordsGauge(Gauge<Long> gauge) {
        gauge(MetricNames.PENDING_RECORDS, gauge);
    }

    public OperatorIOMetricGroup getIOMetricGroup() {
        return this.operatorIOMetricGroup;
    }

    public void recordEmitted(long j) {
        this.idleStartTime = Long.MAX_VALUE;
        this.lastEventTime = j;
    }

    public void idlingStarted() {
        if (isIdling()) {
            return;
        }
        this.idleStartTime = this.clock.absoluteTimeMillis();
    }

    public void watermarkEmitted(long j) {
        if (j == MAX_WATERMARK_TIMESTAMP) {
            return;
        }
        this.lastWatermark = j;
        if (this.firstWatermark) {
            this.parentMetricGroup.gauge(MetricNames.WATERMARK_LAG, this::getWatermarkLag);
            this.firstWatermark = false;
        }
    }

    public void updateMaxDesiredWatermark(long j) {
        this.currentMaxDesiredWatermark = j;
        if (this.firstDesiredWatermark) {
            this.parentMetricGroup.gauge(MetricNames.WATERMARK_ALIGNMENT_DRIFT, this::getAlignedWatermarkDrift);
            this.firstDesiredWatermark = false;
        }
    }

    boolean isIdling() {
        return this.idleStartTime != Long.MAX_VALUE;
    }

    long getIdleTime() {
        if (isIdling()) {
            return this.clock.absoluteTimeMillis() - this.idleStartTime;
        }
        return 0L;
    }

    private long getLastEmitTime() {
        return isIdling() ? this.idleStartTime : this.clock.absoluteTimeMillis();
    }

    long getEmitTimeLag() {
        if (this.lastEventTime != Long.MIN_VALUE) {
            return getLastEmitTime() - this.lastEventTime;
        }
        return -1L;
    }

    long getWatermarkLag() {
        return getLastEmitTime() - this.lastWatermark;
    }

    long getAlignedWatermarkDrift() {
        return this.lastWatermark - this.currentMaxDesiredWatermark;
    }
}
