/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateWithPeriodicWatermarks;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateWithPunctuatedWatermarks;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

public abstract class AbstractFetcher<T, KPH> {
    private static final int NO_TIMESTAMPS_WATERMARKS = 0;
    private static final int PERIODIC_WATERMARKS = 1;
    private static final int PUNCTUATED_WATERMARKS = 2;
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final KafkaTopicPartitionState<KPH>[] allPartitions;
    private final int timestampWatermarkMode;
    protected final boolean useMetrics;
    private volatile long maxWatermarkSoFar = Long.MIN_VALUE;

    protected AbstractFetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> assignedPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, boolean useMetrics) throws Exception {
        this.sourceContext = (SourceFunction.SourceContext)Preconditions.checkNotNull(sourceContext);
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.useMetrics = useMetrics;
        if (watermarksPeriodic == null) {
            this.timestampWatermarkMode = watermarksPunctuated == null ? 0 : 2;
        } else if (watermarksPunctuated == null) {
            this.timestampWatermarkMode = 1;
        } else {
            throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
        }
        this.allPartitions = this.initializePartitions(assignedPartitions, this.timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, runtimeContext.getUserCodeClassLoader());
        if (this.timestampWatermarkMode == 1) {
            KafkaTopicPartitionStateWithPeriodicWatermarks[] parts = (KafkaTopicPartitionStateWithPeriodicWatermarks[])this.allPartitions;
            PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext);
            periodicEmitter.start();
        }
    }

    protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
        return this.allPartitions;
    }

    public abstract void runFetchLoop() throws Exception;

    public abstract void cancel();

    public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition var1);

    public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> var1) throws Exception;

    public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
        assert (Thread.holdsLock(this.checkpointLock));
        HashMap<KafkaTopicPartition, Long> state = new HashMap<KafkaTopicPartition, Long>(this.allPartitions.length);
        for (KafkaTopicPartitionState<KPH> partition : this.subscribedPartitions()) {
            if (!partition.isOffsetDefined()) continue;
            state.put(partition.getKafkaTopicPartition(), partition.getOffset());
        }
        return state;
    }

    public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
        for (KafkaTopicPartitionState<KPH> partition : this.allPartitions) {
            Long offset = snapshotState.get(partition.getKafkaTopicPartition());
            if (offset == null) continue;
            partition.setOffset(offset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) {
        if (this.timestampWatermarkMode == 0) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.sourceContext.collect(record);
                partitionState.setOffset(offset);
            }
        } else if (this.timestampWatermarkMode == 1) {
            this.emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset);
        } else {
            this.emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitRecordWithTimestampAndPeriodicWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) {
        long timestamp;
        KafkaTopicPartitionStateWithPeriodicWatermarks withWatermarksState = (KafkaTopicPartitionStateWithPeriodicWatermarks)partitionState;
        Object object = withWatermarksState;
        synchronized (object) {
            timestamp = withWatermarksState.getTimestampForRecord(record);
        }
        object = this.checkpointLock;
        synchronized (object) {
            this.sourceContext.collectWithTimestamp(record, timestamp);
            partitionState.setOffset(offset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitRecordWithTimestampAndPunctuatedWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) {
        KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = (KafkaTopicPartitionStateWithPunctuatedWatermarks)partitionState;
        long timestamp = withWatermarksState.getTimestampForRecord(record);
        Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
        Object object = this.checkpointLock;
        synchronized (object) {
            this.sourceContext.collectWithTimestamp(record, timestamp);
            partitionState.setOffset(offset);
        }
        if (newWatermark != null) {
            this.updateMinPunctuatedWatermark(newWatermark);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
        if (nextWatermark.getTimestamp() > this.maxWatermarkSoFar) {
            long newMin = Long.MAX_VALUE;
            for (KafkaTopicPartitionState<KPH> state : this.allPartitions) {
                KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = (KafkaTopicPartitionStateWithPunctuatedWatermarks)state;
                newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
            }
            if (newMin > this.maxWatermarkSoFar) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    if (newMin > this.maxWatermarkSoFar) {
                        this.maxWatermarkSoFar = newMin;
                        this.sourceContext.emitWatermark(new Watermark(newMin));
                    }
                }
            }
        }
    }

    private KafkaTopicPartitionState<KPH>[] initializePartitions(List<KafkaTopicPartition> assignedPartitions, int timestampWatermarkMode, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
        switch (timestampWatermarkMode) {
            case 0: {
                KafkaTopicPartitionState[] partitions = new KafkaTopicPartitionState[assignedPartitions.size()];
                int pos = 0;
                for (KafkaTopicPartition partition : assignedPartitions) {
                    KPH kafkaHandle = this.createKafkaPartitionHandle(partition);
                    partitions[pos++] = new KafkaTopicPartitionState<KPH>(partition, kafkaHandle);
                }
                return partitions;
            }
            case 1: {
                KafkaTopicPartitionStateWithPeriodicWatermarks[] partitions = new KafkaTopicPartitionStateWithPeriodicWatermarks[assignedPartitions.size()];
                int pos = 0;
                for (KafkaTopicPartition partition : assignedPartitions) {
                    KPH kafkaHandle = this.createKafkaPartitionHandle(partition);
                    AssignerWithPeriodicWatermarks assignerInstance = (AssignerWithPeriodicWatermarks)watermarksPeriodic.deserializeValue(userCodeClassLoader);
                    partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks(partition, kafkaHandle, assignerInstance);
                }
                return partitions;
            }
            case 2: {
                KafkaTopicPartitionStateWithPunctuatedWatermarks[] partitions = new KafkaTopicPartitionStateWithPunctuatedWatermarks[assignedPartitions.size()];
                int pos = 0;
                for (KafkaTopicPartition partition : assignedPartitions) {
                    KPH kafkaHandle = this.createKafkaPartitionHandle(partition);
                    AssignerWithPunctuatedWatermarks assignerInstance = (AssignerWithPunctuatedWatermarks)watermarksPunctuated.deserializeValue(userCodeClassLoader);
                    partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks(partition, kafkaHandle, assignerInstance);
                }
                return partitions;
            }
        }
        throw new RuntimeException();
    }

    protected void addOffsetStateGauge(MetricGroup metricGroup) {
        MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
        MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
        for (KafkaTopicPartitionState<KPH> ktp : this.subscribedPartitions()) {
            currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), (Gauge)new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
            committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), (Gauge)new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
        }
    }

    private static class PeriodicWatermarkEmitter
    implements Triggerable {
        private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
        private final SourceFunction.SourceContext<?> emitter;
        private final StreamingRuntimeContext triggerContext;
        private final long interval;
        private long lastWatermarkTimestamp;

        PeriodicWatermarkEmitter(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions, SourceFunction.SourceContext<?> emitter, StreamingRuntimeContext runtimeContext) {
            this.allPartitions = (KafkaTopicPartitionStateWithPeriodicWatermarks[])Preconditions.checkNotNull(allPartitions);
            this.emitter = (SourceFunction.SourceContext)Preconditions.checkNotNull(emitter);
            this.triggerContext = (StreamingRuntimeContext)Preconditions.checkNotNull((Object)runtimeContext);
            this.interval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
            this.lastWatermarkTimestamp = Long.MIN_VALUE;
        }

        public void start() {
            this.triggerContext.registerTimer(this.triggerContext.getCurrentProcessingTime() + this.interval, (Triggerable)this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void trigger(long timestamp) throws Exception {
            assert (Thread.holdsLock(this.emitter.getCheckpointLock()));
            long minAcrossAll = Long.MAX_VALUE;
            KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] kafkaTopicPartitionStateWithPeriodicWatermarksArray = this.allPartitions;
            int n = kafkaTopicPartitionStateWithPeriodicWatermarksArray.length;
            for (int i = 0; i < n; ++i) {
                long curr;
                KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state;
                KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> kafkaTopicPartitionStateWithPeriodicWatermarks = state = kafkaTopicPartitionStateWithPeriodicWatermarksArray[i];
                synchronized (kafkaTopicPartitionStateWithPeriodicWatermarks) {
                    curr = state.getCurrentWatermarkTimestamp();
                }
                minAcrossAll = Math.min(minAcrossAll, curr);
            }
            if (minAcrossAll > this.lastWatermarkTimestamp) {
                this.lastWatermarkTimestamp = minAcrossAll;
                this.emitter.emitWatermark(new Watermark(minAcrossAll));
            }
            this.triggerContext.registerTimer(this.triggerContext.getCurrentProcessingTime() + this.interval, (Triggerable)this);
        }
    }

    private static class OffsetGauge
    implements Gauge<Long> {
        private final KafkaTopicPartitionState ktp;
        private final OffsetGaugeType gaugeType;

        public OffsetGauge(KafkaTopicPartitionState ktp, OffsetGaugeType gaugeType) {
            this.ktp = ktp;
            this.gaugeType = gaugeType;
        }

        public Long getValue() {
            switch (this.gaugeType) {
                case COMMITTED_OFFSET: {
                    return this.ktp.getCommittedOffset();
                }
                case CURRENT_OFFSET: {
                    return this.ktp.getOffset();
                }
            }
            throw new RuntimeException("Unknown gauge type: " + (Object)((Object)this.gaugeType));
        }
    }

    private static enum OffsetGaugeType {
        CURRENT_OFFSET,
        COMMITTED_OFFSET;

    }
}

