package io.apicurio.registry.kafka;

import io.apicurio.registry.common.proto.Cmmn;
import io.apicurio.registry.kafka.snapshot.StorageSnapshot;
import io.apicurio.registry.kafka.snapshot.StorageSnapshotSerde;
import io.apicurio.registry.storage.proto.Str;
import io.apicurio.registry.utils.kafka.ConsumerActions;
import io.apicurio.registry.utils.kafka.ConsumerContainer;
import io.apicurio.registry.utils.kafka.ConsumerSkipRecordsSerializationExceptionHandler;
import io.apicurio.registry.utils.kafka.Oneof2;
import io.apicurio.registry.utils.kafka.Seek;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;

/* loaded from: input_file:io/apicurio/registry/kafka/RegistryConsumerContainer.class */
public class RegistryConsumerContainer extends ConsumerContainer<Cmmn.UUID, Str.StorageValue> implements ConsumerActions.DynamicAssignment<Cmmn.UUID, Str.StorageValue> {
    private static final long SNAPSHOTS_POLL_TIMEOUT = 15000;
    private KafkaRegistryStorageHandle handle;
    private Properties snapshotProperties;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public RegistryConsumerContainer(Properties properties, Deserializer<Cmmn.UUID> deserializer, Deserializer<Str.StorageValue> deserializer2, KafkaRegistryStorageHandle kafkaRegistryStorageHandle, Properties properties2) {
        super(properties, deserializer, deserializer2, Oneof2.first(kafkaRegistryStorageHandle::consumeStorageValue), new ConsumerSkipRecordsSerializationExceptionHandler());
        kafkaRegistryStorageHandle.getClass();
        this.handle = kafkaRegistryStorageHandle;
        this.snapshotProperties = properties2;
    }

    @Override // io.apicurio.registry.utils.kafka.ConsumerActions
    public void start() {
        Seek.Offset offset;
        super.start();
        StorageSnapshot loadSnapshot = loadSnapshot();
        if (loadSnapshot != null) {
            this.handle.loadSnapshot(loadSnapshot);
            offset = loadSnapshot.getOffset() > 0 ? Seek.TO_ABSOLUTE.offset(loadSnapshot.getOffset() + 1) : Seek.FROM_BEGINNING.offset(0L);
        } else {
            offset = Seek.FROM_BEGINNING.offset(0L);
        }
        addTopicPartition(new TopicPartition(this.handle.registryTopic(), 0), offset);
        this.handle.start();
    }

    @Override // io.apicurio.registry.utils.kafka.ConsumerActions
    public void stop() {
        try {
            this.handle.stop();
            removeTopicParition(new TopicPartition(this.handle.registryTopic(), 0));
            super.stop();
        } finally {
            close();
        }
    }

    private StorageSnapshot loadSnapshot() {
        TopicPartition topicPartition = new TopicPartition(this.handle.snapshotTopic(), 0);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.snapshotProperties, Serdes.Long().deserializer(), new StorageSnapshotSerde());
        Throwable th = null;
        try {
            try {
                kafkaConsumer.assign(Collections.singleton(topicPartition));
                long longValue = ((Long) kafkaConsumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition)).longValue();
                if (longValue == 0) {
                    if (kafkaConsumer != null) {
                        if (0 != 0) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaConsumer.close();
                        }
                    }
                    return null;
                }
                kafkaConsumer.seek(topicPartition, longValue - 1);
                StorageSnapshot storageSnapshot = (StorageSnapshot) ((ConsumerRecord) kafkaConsumer.poll(Duration.ofMillis(SNAPSHOTS_POLL_TIMEOUT)).records(topicPartition).stream().reduce((consumerRecord, consumerRecord2) -> {
                    return consumerRecord.offset() > consumerRecord2.offset() ? consumerRecord : consumerRecord2;
                }).orElseThrow(() -> {
                    return new IllegalStateException("Couldn't read last snapshot in 15000 ms");
                })).value();
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return storageSnapshot;
            } finally {
            }
        } catch (Throwable th4) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th4;
        }
    }
}
