/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateWithPeriodicWatermarks;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateWithPunctuatedWatermarks;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

public abstract class AbstractFetcher<T, KPH> {
    protected static final int NO_TIMESTAMPS_WATERMARKS = 0;
    protected static final int PERIODIC_WATERMARKS = 1;
    protected static final int PUNCTUATED_WATERMARKS = 2;
    protected final SourceFunction.SourceContext<T> sourceContext;
    protected final Object checkpointLock;
    private final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates;
    protected final int timestampWatermarkMode;
    protected final boolean useMetrics;
    private volatile long maxWatermarkSoFar = Long.MIN_VALUE;

    protected AbstractFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, boolean useMetrics) throws Exception {
        this.sourceContext = (SourceFunction.SourceContext)Preconditions.checkNotNull(sourceContext);
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.useMetrics = useMetrics;
        if (watermarksPeriodic == null) {
            this.timestampWatermarkMode = watermarksPunctuated == null ? 0 : 2;
        } else if (watermarksPunctuated == null) {
            this.timestampWatermarkMode = 1;
        } else {
            throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
        }
        for (KafkaTopicPartitionState<KPH> partitionState : this.subscribedPartitionStates = this.initializeSubscribedPartitionStates(assignedPartitionsWithInitialOffsets, this.timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, userCodeClassLoader)) {
            if (partitionState.isOffsetDefined()) continue;
            throw new IllegalArgumentException("The fetcher was assigned partitions with undefined initial offsets.");
        }
        if (this.timestampWatermarkMode == 1) {
            KafkaTopicPartitionStateWithPeriodicWatermarks[] parts = (KafkaTopicPartitionStateWithPeriodicWatermarks[])this.subscribedPartitionStates;
            PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
            periodicEmitter.start();
        }
    }

    protected final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates() {
        return this.subscribedPartitionStates;
    }

    public abstract void runFetchLoop() throws Exception;

    public abstract void cancel();

    public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition var1);

    public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> var1, @Nonnull KafkaCommitCallback var2) throws Exception;

    public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
        assert (Thread.holdsLock(this.checkpointLock));
        HashMap<KafkaTopicPartition, Long> state = new HashMap<KafkaTopicPartition, Long>(this.subscribedPartitionStates.length);
        for (KafkaTopicPartitionState<KPH> partition : this.subscribedPartitionStates()) {
            state.put(partition.getKafkaTopicPartition(), partition.getOffset());
        }
        return state;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
        if (record != null) {
            if (this.timestampWatermarkMode == 0) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    this.sourceContext.collect(record);
                    partitionState.setOffset(offset);
                }
            } else if (this.timestampWatermarkMode == 1) {
                this.emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
            } else {
                this.emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
            }
        } else {
            Object object = this.checkpointLock;
            synchronized (object) {
                partitionState.setOffset(offset);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void emitRecordWithTimestamp(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
        if (record != null) {
            if (this.timestampWatermarkMode == 0) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    this.sourceContext.collectWithTimestamp(record, timestamp);
                    partitionState.setOffset(offset);
                }
            } else if (this.timestampWatermarkMode == 1) {
                this.emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
            } else {
                this.emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
            }
        } else {
            Object object = this.checkpointLock;
            synchronized (object) {
                partitionState.setOffset(offset);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void emitRecordWithTimestampAndPeriodicWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) {
        long timestamp;
        KafkaTopicPartitionStateWithPeriodicWatermarks withWatermarksState = (KafkaTopicPartitionStateWithPeriodicWatermarks)partitionState;
        Object object = withWatermarksState;
        synchronized (object) {
            timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
        }
        object = this.checkpointLock;
        synchronized (object) {
            this.sourceContext.collectWithTimestamp(record, timestamp);
            partitionState.setOffset(offset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void emitRecordWithTimestampAndPunctuatedWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) {
        KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = (KafkaTopicPartitionStateWithPunctuatedWatermarks)partitionState;
        long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
        Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
        Object object = this.checkpointLock;
        synchronized (object) {
            this.sourceContext.collectWithTimestamp(record, timestamp);
            partitionState.setOffset(offset);
        }
        if (newWatermark != null) {
            this.updateMinPunctuatedWatermark(newWatermark);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
        if (nextWatermark.getTimestamp() > this.maxWatermarkSoFar) {
            long newMin = Long.MAX_VALUE;
            for (KafkaTopicPartitionState<KPH> state : this.subscribedPartitionStates) {
                KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = (KafkaTopicPartitionStateWithPunctuatedWatermarks)state;
                newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
            }
            if (newMin > this.maxWatermarkSoFar) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    if (newMin > this.maxWatermarkSoFar) {
                        this.maxWatermarkSoFar = newMin;
                        this.sourceContext.emitWatermark(new Watermark(newMin));
                    }
                }
            }
        }
    }

    private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates(Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets, int timestampWatermarkMode, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
        switch (timestampWatermarkMode) {
            case 0: {
                KafkaTopicPartitionState[] partitions = new KafkaTopicPartitionState[assignedPartitionsToInitialOffsets.size()];
                int pos = 0;
                for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
                    KPH kafkaHandle = this.createKafkaPartitionHandle(partition.getKey());
                    partitions[pos] = new KafkaTopicPartitionState<KPH>(partition.getKey(), kafkaHandle);
                    partitions[pos].setOffset(partition.getValue());
                    ++pos;
                }
                return partitions;
            }
            case 1: {
                KafkaTopicPartitionStateWithPeriodicWatermarks[] partitions = new KafkaTopicPartitionStateWithPeriodicWatermarks[assignedPartitionsToInitialOffsets.size()];
                int pos = 0;
                for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
                    KPH kafkaHandle = this.createKafkaPartitionHandle(partition.getKey());
                    AssignerWithPeriodicWatermarks assignerInstance = (AssignerWithPeriodicWatermarks)watermarksPeriodic.deserializeValue(userCodeClassLoader);
                    partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks(partition.getKey(), kafkaHandle, assignerInstance);
                    partitions[pos].setOffset(partition.getValue());
                    ++pos;
                }
                return partitions;
            }
            case 2: {
                KafkaTopicPartitionStateWithPunctuatedWatermarks[] partitions = new KafkaTopicPartitionStateWithPunctuatedWatermarks[assignedPartitionsToInitialOffsets.size()];
                int pos = 0;
                for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) {
                    KPH kafkaHandle = this.createKafkaPartitionHandle(partition.getKey());
                    AssignerWithPunctuatedWatermarks assignerInstance = (AssignerWithPunctuatedWatermarks)watermarksPunctuated.deserializeValue(userCodeClassLoader);
                    partitions[pos] = new KafkaTopicPartitionStateWithPunctuatedWatermarks(partition.getKey(), kafkaHandle, assignerInstance);
                    partitions[pos].setOffset(partition.getValue());
                    ++pos;
                }
                return partitions;
            }
        }
        throw new RuntimeException();
    }

    protected void addOffsetStateGauge(MetricGroup metricGroup) {
        MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
        MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
        for (KafkaTopicPartitionState<KPH> ktp : this.subscribedPartitionStates()) {
            currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), (Gauge)new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
            committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), (Gauge)new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
        }
    }

    private static class PeriodicWatermarkEmitter
    implements ProcessingTimeCallback {
        private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
        private final SourceFunction.SourceContext<?> emitter;
        private final ProcessingTimeService timerService;
        private final long interval;
        private long lastWatermarkTimestamp;

        PeriodicWatermarkEmitter(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions, SourceFunction.SourceContext<?> emitter, ProcessingTimeService timerService, long autoWatermarkInterval) {
            this.allPartitions = (KafkaTopicPartitionStateWithPeriodicWatermarks[])Preconditions.checkNotNull(allPartitions);
            this.emitter = (SourceFunction.SourceContext)Preconditions.checkNotNull(emitter);
            this.timerService = (ProcessingTimeService)Preconditions.checkNotNull((Object)timerService);
            this.interval = autoWatermarkInterval;
            this.lastWatermarkTimestamp = Long.MIN_VALUE;
        }

        public void start() {
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, (ProcessingTimeCallback)this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onProcessingTime(long timestamp) throws Exception {
            long minAcrossAll = Long.MAX_VALUE;
            KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] kafkaTopicPartitionStateWithPeriodicWatermarksArray = this.allPartitions;
            int n = kafkaTopicPartitionStateWithPeriodicWatermarksArray.length;
            for (int i = 0; i < n; ++i) {
                long curr;
                KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state;
                KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> kafkaTopicPartitionStateWithPeriodicWatermarks = state = kafkaTopicPartitionStateWithPeriodicWatermarksArray[i];
                synchronized (kafkaTopicPartitionStateWithPeriodicWatermarks) {
                    curr = state.getCurrentWatermarkTimestamp();
                }
                minAcrossAll = Math.min(minAcrossAll, curr);
            }
            if (minAcrossAll > this.lastWatermarkTimestamp) {
                this.lastWatermarkTimestamp = minAcrossAll;
                this.emitter.emitWatermark(new Watermark(minAcrossAll));
            }
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, (ProcessingTimeCallback)this);
        }
    }

    private static class OffsetGauge
    implements Gauge<Long> {
        private final KafkaTopicPartitionState<?> ktp;
        private final OffsetGaugeType gaugeType;

        public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
            this.ktp = ktp;
            this.gaugeType = gaugeType;
        }

        public Long getValue() {
            switch (this.gaugeType) {
                case COMMITTED_OFFSET: {
                    return this.ktp.getCommittedOffset();
                }
                case CURRENT_OFFSET: {
                    return this.ktp.getOffset();
                }
            }
            throw new RuntimeException("Unknown gauge type: " + (Object)((Object)this.gaugeType));
        }
    }

    private static enum OffsetGaugeType {
        CURRENT_OFFSET,
        COMMITTED_OFFSET;

    }
}

