/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class FlinkKafkaConsumerBase<T>
extends RichParallelSourceFunction<T>
implements CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction {
    private static final long serialVersionUID = -6272159445203409112L;
    protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    private final KafkaTopicsDescriptor topicsDescriptor;
    protected final KeyedDeserializationSchema<T> deserializer;
    private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
    private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
    private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
    private boolean enableCommitOnCheckpoints = true;
    private OffsetCommitMode offsetCommitMode;
    private final long discoveryIntervalMillis;
    private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
    private Map<KafkaTopicPartition, Long> specificStartupOffsets;
    private Long startupOffsetsTimestamp;
    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
    private volatile transient AbstractFetcher<T, ?> kafkaFetcher;
    private volatile transient AbstractPartitionDiscoverer partitionDiscoverer;
    private volatile transient TreeMap<KafkaTopicPartition, Long> restoredState;
    private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
    private boolean restoredFromOldState;
    private volatile transient Thread discoveryLoopThread;
    private volatile boolean running = true;
    private final boolean useMetrics;
    private transient Counter successfulCommits;
    private transient Counter failedCommits;
    private transient KafkaCommitCallback offsetCommitCallback;

    public FlinkKafkaConsumerBase(List<String> topics, Pattern topicPattern, KeyedDeserializationSchema<T> deserializer, long discoveryIntervalMillis, boolean useMetrics) {
        this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
        this.deserializer = (KeyedDeserializationSchema)Preconditions.checkNotNull(deserializer, (String)"valueDeserializer");
        Preconditions.checkArgument((discoveryIntervalMillis == Long.MIN_VALUE || discoveryIntervalMillis >= 0L ? 1 : 0) != 0, (Object)"Cannot define a negative value for the topic / partition discovery interval.");
        this.discoveryIntervalMillis = discoveryIntervalMillis;
        this.useMetrics = useMetrics;
    }

    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
        Preconditions.checkNotNull(assigner);
        if (this.periodicWatermarkAssigner != null) {
            throw new IllegalStateException("A periodic watermark emitter has already been set.");
        }
        try {
            ClosureCleaner.clean(assigner, (boolean)true);
            this.punctuatedWatermarkAssigner = new SerializedValue(assigner);
            return this;
        }
        catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
        Preconditions.checkNotNull(assigner);
        if (this.punctuatedWatermarkAssigner != null) {
            throw new IllegalStateException("A punctuated watermark emitter has already been set.");
        }
        try {
            ClosureCleaner.clean(assigner, (boolean)true);
            this.periodicWatermarkAssigner = new SerializedValue(assigner);
            return this;
        }
        catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
        this.enableCommitOnCheckpoints = commitOnCheckpoints;
        return this;
    }

    public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
        this.startupMode = StartupMode.EARLIEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkKafkaConsumerBase<T> setStartFromLatest() {
        this.startupMode = StartupMode.LATEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
        Preconditions.checkArgument((startupOffsetsTimestamp >= 0L ? 1 : 0) != 0, (Object)"The provided value for the startup offsets timestamp is invalid.");
        long currentTimestamp = System.currentTimeMillis();
        Preconditions.checkArgument((startupOffsetsTimestamp <= currentTimestamp ? 1 : 0) != 0, (String)"Startup time[%s] must be before current time[%s].", (Object[])new Object[]{startupOffsetsTimestamp, currentTimestamp});
        this.startupMode = StartupMode.TIMESTAMP;
        this.startupOffsetsTimestamp = startupOffsetsTimestamp;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
        this.startupMode = StartupMode.GROUP_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
        this.startupMode = StartupMode.SPECIFIC_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = (Map)Preconditions.checkNotNull(specificStartupOffsets);
        return this;
    }

    public void open(Configuration configuration) throws Exception {
        block22: {
            block23: {
                List<KafkaTopicPartition> allPartitions;
                block21: {
                    this.offsetCommitMode = OffsetCommitModes.fromConfiguration(this.getIsAutoCommitEnabled(), this.enableCommitOnCheckpoints, ((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled());
                    this.partitionDiscoverer = this.createPartitionDiscoverer(this.topicsDescriptor, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.partitionDiscoverer.open();
                    this.subscribedPartitionsToStartOffsets = new HashMap<KafkaTopicPartition, Long>();
                    allPartitions = this.partitionDiscoverer.discoverPartitions();
                    if (this.restoredState == null) break block21;
                    for (KafkaTopicPartition kafkaTopicPartition : allPartitions) {
                        if (this.restoredState.containsKey(kafkaTopicPartition)) continue;
                        this.restoredState.put(kafkaTopicPartition, -915623761775L);
                    }
                    for (Map.Entry entry : this.restoredState.entrySet()) {
                        if (!this.restoredFromOldState) {
                            if (KafkaTopicPartitionAssigner.assign((KafkaTopicPartition)entry.getKey(), this.getRuntimeContext().getNumberOfParallelSubtasks()) != this.getRuntimeContext().getIndexOfThisSubtask()) continue;
                            this.subscribedPartitionsToStartOffsets.put((KafkaTopicPartition)entry.getKey(), (Long)entry.getValue());
                            continue;
                        }
                        this.subscribedPartitionsToStartOffsets.put((KafkaTopicPartition)entry.getKey(), (Long)entry.getValue());
                    }
                    LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets.size(), this.subscribedPartitionsToStartOffsets});
                    break block22;
                }
                switch (this.startupMode) {
                    case SPECIFIC_OFFSETS: {
                        if (this.specificStartupOffsets == null) {
                            throw new IllegalStateException("Startup mode for the consumer set to " + (Object)((Object)StartupMode.SPECIFIC_OFFSETS) + ", but no specific offsets were specified.");
                        }
                        for (KafkaTopicPartition kafkaTopicPartition : allPartitions) {
                            Long specificOffset = this.specificStartupOffsets.get(kafkaTopicPartition);
                            if (specificOffset != null) {
                                this.subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, specificOffset - 1L);
                                continue;
                            }
                            this.subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, -915623761773L);
                        }
                        break;
                    }
                    case TIMESTAMP: {
                        if (this.startupOffsetsTimestamp == null) {
                            throw new IllegalStateException("Startup mode for the consumer set to " + (Object)((Object)StartupMode.TIMESTAMP) + ", but no startup timestamp was specified.");
                        }
                        for (Map.Entry entry : this.fetchOffsetsWithTimestamp(allPartitions, this.startupOffsetsTimestamp).entrySet()) {
                            this.subscribedPartitionsToStartOffsets.put((KafkaTopicPartition)entry.getKey(), entry.getValue() == null ? -915623761774L : (Long)entry.getValue() - 1L);
                        }
                        break;
                    }
                    default: {
                        for (KafkaTopicPartition kafkaTopicPartition : allPartitions) {
                            this.subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, this.startupMode.getStateSentinel());
                        }
                    }
                }
                if (this.subscribedPartitionsToStartOffsets.isEmpty()) break block23;
                switch (this.startupMode) {
                    case EARLIEST: {
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets.size(), this.subscribedPartitionsToStartOffsets.keySet()});
                        break;
                    }
                    case LATEST: {
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets.size(), this.subscribedPartitionsToStartOffsets.keySet()});
                        break;
                    }
                    case TIMESTAMP: {
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets.size(), this.startupOffsetsTimestamp, this.subscribedPartitionsToStartOffsets.keySet()});
                        break;
                    }
                    case SPECIFIC_OFFSETS: {
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets.size(), this.specificStartupOffsets, this.subscribedPartitionsToStartOffsets.keySet()});
                        ArrayList<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<KafkaTopicPartition>(this.subscribedPartitionsToStartOffsets.size());
                        for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : this.subscribedPartitionsToStartOffsets.entrySet()) {
                            if (subscribedPartition.getValue() != -915623761773L) continue;
                            partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                        }
                        if (partitionsDefaultedToGroupOffsets.size() > 0) {
                            LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}; their startup offsets will be defaulted to their committed group offsets in Kafka.", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), partitionsDefaultedToGroupOffsets.size(), partitionsDefaultedToGroupOffsets});
                            break;
                        }
                        break block22;
                    }
                    default: {
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets.size(), this.subscribedPartitionsToStartOffsets.keySet()});
                        break;
                    }
                }
                break block22;
            }
            LOG.info("Consumer subtask {} initially has no partitions to read from.", (Object)this.getRuntimeContext().getIndexOfThisSubtask());
        }
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }
        this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
        this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
        this.offsetCommitCallback = new KafkaCommitCallback(){

            @Override
            public void onSuccess() {
                FlinkKafkaConsumerBase.this.successfulCommits.inc();
            }

            @Override
            public void onException(Throwable cause) {
                LOG.warn("Async Kafka commit failed.", cause);
                FlinkKafkaConsumerBase.this.failedCommits.inc();
            }
        };
        if (this.subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }
        this.kafkaFetcher = this.createFetcher(sourceContext, this.subscribedPartitionsToStartOffsets, this.periodicWatermarkAssigner, this.punctuatedWatermarkAssigner, (StreamingRuntimeContext)this.getRuntimeContext(), this.offsetCommitMode, this.getRuntimeContext().getMetricGroup().addGroup("KafkaConsumer"), this.useMetrics);
        if (!this.running) {
            return;
        }
        if (this.discoveryIntervalMillis != Long.MIN_VALUE) {
            final AtomicReference discoveryLoopErrorRef = new AtomicReference();
            this.discoveryLoopThread = new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        while (FlinkKafkaConsumerBase.this.running) {
                            List<KafkaTopicPartition> discoveredPartitions;
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Consumer subtask {} is trying to discover new partitions ...", (Object)FlinkKafkaConsumerBase.this.getRuntimeContext().getIndexOfThisSubtask());
                            }
                            try {
                                discoveredPartitions = FlinkKafkaConsumerBase.this.partitionDiscoverer.discoverPartitions();
                            }
                            catch (AbstractPartitionDiscoverer.ClosedException | AbstractPartitionDiscoverer.WakeupException e) {
                                break;
                            }
                            if (FlinkKafkaConsumerBase.this.running && !discoveredPartitions.isEmpty()) {
                                FlinkKafkaConsumerBase.this.kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
                            }
                            if (!FlinkKafkaConsumerBase.this.running || FlinkKafkaConsumerBase.this.discoveryIntervalMillis == 0L) continue;
                            try {
                                Thread.sleep(FlinkKafkaConsumerBase.this.discoveryIntervalMillis);
                            }
                            catch (InterruptedException iex) {
                                // empty catch block
                                break;
                            }
                        }
                    }
                    catch (Exception e) {
                        discoveryLoopErrorRef.set(e);
                    }
                    finally {
                        if (FlinkKafkaConsumerBase.this.running) {
                            FlinkKafkaConsumerBase.this.cancel();
                        }
                    }
                }
            }, "Kafka Partition Discovery for " + this.getRuntimeContext().getTaskNameWithSubtasks());
            this.discoveryLoopThread.start();
            this.kafkaFetcher.runFetchLoop();
            this.partitionDiscoverer.close();
            this.discoveryLoopThread.join();
            Exception discoveryLoopError = (Exception)discoveryLoopErrorRef.get();
            if (discoveryLoopError != null) {
                throw new RuntimeException(discoveryLoopError);
            }
        } else {
            this.partitionDiscoverer.close();
            this.kafkaFetcher.runFetchLoop();
        }
    }

    public void cancel() {
        this.running = false;
        if (this.discoveryLoopThread != null) {
            if (this.partitionDiscoverer != null) {
                this.partitionDiscoverer.wakeup();
            }
            this.discoveryLoopThread.interrupt();
        }
        if (this.kafkaFetcher != null) {
            this.kafkaFetcher.cancel();
        }
    }

    public void close() throws Exception {
        try {
            this.cancel();
        }
        finally {
            super.close();
        }
    }

    public final void initializeState(FunctionInitializationContext context) throws Exception {
        OperatorStateStore stateStore = context.getOperatorStateStore();
        ListState oldRoundRobinListState = stateStore.getSerializableListState("_default_");
        this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, TypeInformation.of((TypeHint)new TypeHint<Tuple2<KafkaTopicPartition, Long>>(){})));
        if (context.isRestored() && !this.restoredFromOldState) {
            this.restoredState = new TreeMap(new KafkaTopicPartition.Comparator());
            for (Tuple2 kafkaOffset : (Iterable)oldRoundRobinListState.get()) {
                this.restoredFromOldState = true;
                this.unionOffsetStates.add((Object)kafkaOffset);
            }
            oldRoundRobinListState.clear();
            if (this.restoredFromOldState && this.discoveryIntervalMillis != Long.MIN_VALUE) {
                throw new IllegalArgumentException("Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
            }
            for (Tuple2 kafkaOffset : (Iterable)this.unionOffsetStates.get()) {
                this.restoredState.put((KafkaTopicPartition)kafkaOffset.f0, (Long)kafkaOffset.f1);
            }
            LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", this.restoredState);
        } else {
            LOG.info("No restore state for FlinkKafkaConsumer.");
        }
    }

    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!this.running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            this.unionOffsetStates.clear();
            AbstractFetcher<T, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : this.subscribedPartitionsToStartOffsets.entrySet()) {
                    this.unionOffsetStates.add((Object)Tuple2.of((Object)subscribedPartition.getKey(), (Object)subscribedPartition.getValue()));
                }
                if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    this.pendingOffsetsToCommit.put((Object)context.getCheckpointId(), this.restoredState);
                }
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
                if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    this.pendingOffsetsToCommit.put((Object)context.getCheckpointId(), currentOffsets);
                }
                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    this.unionOffsetStates.add((Object)Tuple2.of((Object)kafkaTopicPartitionLongEntry.getKey(), (Object)kafkaTopicPartitionLongEntry.getValue()));
                }
            }
            if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                while (this.pendingOffsetsToCommit.size() > 100) {
                    this.pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }

    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        block9: {
            if (!this.running) {
                LOG.debug("notifyCheckpointComplete() called on closed source");
                return;
            }
            AbstractFetcher<T, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                LOG.debug("notifyCheckpointComplete() called on uninitialized source");
                return;
            }
            if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
                }
                try {
                    int posInMap = this.pendingOffsetsToCommit.indexOf((Object)checkpointId);
                    if (posInMap == -1) {
                        LOG.warn("Received confirmation for unknown checkpoint id {}", (Object)checkpointId);
                        return;
                    }
                    Map offsets = (Map)this.pendingOffsetsToCommit.remove(posInMap);
                    for (int i = 0; i < posInMap; ++i) {
                        this.pendingOffsetsToCommit.remove(0);
                    }
                    if (offsets == null || offsets.size() == 0) {
                        LOG.debug("Checkpoint state was empty.");
                        return;
                    }
                    fetcher.commitInternalOffsetsToKafka(offsets, this.offsetCommitCallback);
                }
                catch (Exception e) {
                    if (!this.running) break block9;
                    throw e;
                }
            }
        }
    }

    protected abstract AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> var1, Map<KafkaTopicPartition, Long> var2, SerializedValue<AssignerWithPeriodicWatermarks<T>> var3, SerializedValue<AssignerWithPunctuatedWatermarks<T>> var4, StreamingRuntimeContext var5, OffsetCommitMode var6, MetricGroup var7, boolean var8) throws Exception;

    protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor var1, int var2, int var3);

    protected abstract boolean getIsAutoCommitEnabled();

    protected abstract Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> var1, long var2);

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    @VisibleForTesting
    Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() {
        return this.subscribedPartitionsToStartOffsets;
    }

    @VisibleForTesting
    TreeMap<KafkaTopicPartition, Long> getRestoredState() {
        return this.restoredState;
    }

    @VisibleForTesting
    OffsetCommitMode getOffsetCommitMode() {
        return this.offsetCommitMode;
    }

    @VisibleForTesting
    LinkedMap getPendingOffsetsToCommit() {
        return this.pendingOffsetsToCommit;
    }
}

