package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.HashMap;
import java.util.List;
import kafka.common.TopicAndPartition;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.class */
public class PeriodicOffsetCommitter extends Thread {
    private final ZookeeperOffsetHandler offsetHandler;
    private final List<KafkaTopicPartitionState<TopicAndPartition>> partitionStates;
    private final ExceptionProxy errorHandler;
    private final long commitInterval;
    private volatile boolean running = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PeriodicOffsetCommitter(ZookeeperOffsetHandler zookeeperOffsetHandler, List<KafkaTopicPartitionState<TopicAndPartition>> list, ExceptionProxy exceptionProxy, long j) {
        this.offsetHandler = (ZookeeperOffsetHandler) Preconditions.checkNotNull(zookeeperOffsetHandler);
        this.partitionStates = (List) Preconditions.checkNotNull(list);
        this.errorHandler = (ExceptionProxy) Preconditions.checkNotNull(exceptionProxy);
        this.commitInterval = j;
        Preconditions.checkArgument(j > 0);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                Thread.sleep(this.commitInterval);
                HashMap hashMap = new HashMap(this.partitionStates.size());
                for (KafkaTopicPartitionState<TopicAndPartition> kafkaTopicPartitionState : this.partitionStates) {
                    hashMap.put(kafkaTopicPartitionState.getKafkaTopicPartition(), Long.valueOf(kafkaTopicPartitionState.getOffset()));
                }
                this.offsetHandler.prepareAndCommitOffsets(hashMap);
            } catch (Throwable th) {
                if (this.running) {
                    this.errorHandler.reportError(new Exception("The periodic offset committer encountered an error: " + th.getMessage(), th));
                    return;
                }
                return;
            }
        }
    }

    public void shutdown() {
        this.running = false;
        interrupt();
    }
}
