/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.HashMap;
import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.util.Preconditions;

public class PeriodicOffsetCommitter
extends Thread {
    private final ZookeeperOffsetHandler offsetHandler;
    private final KafkaTopicPartitionState<?>[] partitionStates;
    private final ExceptionProxy errorHandler;
    private final long commitInterval;
    private volatile boolean running = true;

    PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler, KafkaTopicPartitionState<?>[] partitionStates, ExceptionProxy errorHandler, long commitInterval) {
        this.offsetHandler = (ZookeeperOffsetHandler)Preconditions.checkNotNull((Object)offsetHandler);
        this.partitionStates = (KafkaTopicPartitionState[])Preconditions.checkNotNull(partitionStates);
        this.errorHandler = (ExceptionProxy)Preconditions.checkNotNull((Object)errorHandler);
        this.commitInterval = commitInterval;
        Preconditions.checkArgument((commitInterval > 0L ? 1 : 0) != 0);
    }

    @Override
    public void run() {
        block4: {
            try {
                while (this.running) {
                    Thread.sleep(this.commitInterval);
                    HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<KafkaTopicPartition, Long>(this.partitionStates.length);
                    for (KafkaTopicPartitionState<?> partitionState : this.partitionStates) {
                        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
                    }
                    this.offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
                }
            }
            catch (Throwable t) {
                if (!this.running) break block4;
                this.errorHandler.reportError((Throwable)new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
            }
        }
    }

    public void shutdown() {
        this.running = false;
        this.interrupt();
    }
}

