package org.apache.iceberg.connect.channel;

import java.util.Collection;
import java.util.Comparator;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.connect.Committer;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.data.SinkWriter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/connect/channel/CommitterImpl.class */
public class CommitterImpl implements Committer {
    private static final Logger LOG = LoggerFactory.getLogger(CommitterImpl.class);
    private CoordinatorThread coordinatorThread;
    private Worker worker;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iceberg/connect/channel/CommitterImpl$TopicPartitionComparator.class */
    public static class TopicPartitionComparator implements Comparator<TopicPartition> {
        TopicPartitionComparator() {
        }

        @Override // java.util.Comparator
        public int compare(TopicPartition topicPartition, TopicPartition topicPartition2) {
            int compareTo = topicPartition.topic().compareTo(topicPartition2.topic());
            if (compareTo == 0) {
                compareTo = Integer.compare(topicPartition.partition(), topicPartition2.partition());
            }
            return compareTo;
        }
    }

    @Override // org.apache.iceberg.connect.Committer
    public void start(Catalog catalog, IcebergSinkConfig icebergSinkConfig, SinkTaskContext sinkTaskContext) {
        KafkaClientFactory kafkaClientFactory = new KafkaClientFactory(icebergSinkConfig.kafkaProps());
        Admin createAdmin = kafkaClientFactory.createAdmin();
        try {
            ConsumerGroupDescription consumerGroupDescription = KafkaUtils.consumerGroupDescription(icebergSinkConfig.connectGroupId(), createAdmin);
            if (createAdmin != null) {
                createAdmin.close();
            }
            if (consumerGroupDescription.state() == ConsumerGroupState.STABLE) {
                Collection<MemberDescription> members = consumerGroupDescription.members();
                if (isLeader(members, sinkTaskContext.assignment())) {
                    LOG.info("Task elected leader, starting commit coordinator");
                    this.coordinatorThread = new CoordinatorThread(new Coordinator(catalog, icebergSinkConfig, members, kafkaClientFactory, sinkTaskContext));
                    this.coordinatorThread.start();
                }
            }
            LOG.info("Starting commit worker");
            this.worker = new Worker(icebergSinkConfig, kafkaClientFactory, new SinkWriter(catalog, icebergSinkConfig), sinkTaskContext);
            this.worker.start();
        } catch (Throwable th) {
            if (createAdmin != null) {
                try {
                    createAdmin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.iceberg.connect.Committer
    public void save(Collection<SinkRecord> collection) {
        if (collection != null && !collection.isEmpty()) {
            this.worker.save(collection);
        }
        processControlEvents();
    }

    @Override // org.apache.iceberg.connect.Committer
    public void stop() {
        if (this.worker != null) {
            this.worker.stop();
            this.worker = null;
        }
        if (this.coordinatorThread != null) {
            this.coordinatorThread.terminate();
            this.coordinatorThread = null;
        }
    }

    @VisibleForTesting
    boolean isLeader(Collection<MemberDescription> collection, Collection<TopicPartition> collection2) {
        return collection2.contains((TopicPartition) collection.stream().flatMap(memberDescription -> {
            return memberDescription.assignment().topicPartitions().stream();
        }).min(new TopicPartitionComparator()).orElseThrow(() -> {
            return new ConnectException("No partitions assigned, cannot determine leader");
        }));
    }

    private void processControlEvents() {
        if (this.coordinatorThread != null && this.coordinatorThread.isTerminated()) {
            throw new NotRunningException("Coordinator unexpectedly terminated");
        }
        if (this.worker != null) {
            this.worker.process();
        }
    }
}
