package com.google.cloud.pubsublite.kafka;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.class */
public class SingleSubscriptionConsumerImpl implements SingleSubscriptionConsumer {
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private final TopicPath topic;
    private final boolean autocommit;
    private final PullSubscriberFactory subscriberFactory;
    private final CommitterFactory committerFactory;
    private final CloseableMonitor monitor = new CloseableMonitor();
    private final Map<Partition, SinglePartitionSubscriber> partitions = new HashMap();
    private SettableApiFuture<Void> assignmentChanged = SettableApiFuture.create();
    private final SettableApiFuture<Void> wakeupTriggered = SettableApiFuture.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SingleSubscriptionConsumerImpl(TopicPath topicPath, boolean z, PullSubscriberFactory pullSubscriberFactory, CommitterFactory committerFactory) {
        this.topic = topicPath;
        this.autocommit = z;
        this.subscriberFactory = pullSubscriberFactory;
        this.committerFactory = committerFactory;
    }

    @Override // com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer
    public void setAssignment(Set<Partition> set) {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            Throwable th = null;
            try {
                Stream filter = ImmutableSet.copyOf((Collection) this.partitions.keySet()).stream().filter(partition -> {
                    return !set.contains(partition);
                });
                Map<Partition, SinglePartitionSubscriber> map = this.partitions;
                map.getClass();
                ApiServiceUtils.blockingShutdown((List) filter.map((v1) -> {
                    return r1.remove(v1);
                }).collect(Collectors.toList()));
                set.stream().filter(partition2 -> {
                    return !this.partitions.containsKey(partition2);
                }).forEach(ExtractStatus.rethrowAsRuntime(partition3 -> {
                    SinglePartitionSubscriber singlePartitionSubscriber = new SinglePartitionSubscriber(this.subscriberFactory, partition3, SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.COMMITTED_CURSOR).build(), this.committerFactory.newCommitter(partition3), this.autocommit);
                    singlePartitionSubscriber.startAsync().awaitRunning();
                    this.partitions.put(partition3, singlePartitionSubscriber);
                }));
                this.assignmentChanged.set(null);
                this.assignmentChanged = SettableApiFuture.create();
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        enter.close();
                    }
                }
            } finally {
            }
        } catch (Throwable th3) {
            throw ExtractStatus.toCanonical(th3).underlying;
        }
    }

    @Override // com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer
    public Set<Partition> assignment() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            Set<Partition> keySet = this.partitions.keySet();
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    enter.close();
                }
            }
            return keySet;
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    private Map<Partition, Queue<SequencedMessage>> doPoll(Duration duration) {
        try {
            ImmutableList.Builder builder = ImmutableList.builder();
            CloseableMonitor.Hold enter = this.monitor.enter();
            Throwable th = null;
            try {
                builder.add((ImmutableList.Builder) this.wakeupTriggered);
                builder.add((ImmutableList.Builder) this.assignmentChanged);
                this.partitions.values().forEach(singlePartitionSubscriber -> {
                    builder.add((ImmutableList.Builder) singlePartitionSubscriber.onData());
                });
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        enter.close();
                    }
                }
                try {
                    ApiFuturesExtensions.whenFirstDone(builder.build()).get(duration.toMillis(), TimeUnit.MILLISECONDS);
                    try {
                        CloseableMonitor.Hold enter2 = this.monitor.enter();
                        Throwable th3 = null;
                        if (this.wakeupTriggered.isDone()) {
                            throw new WakeupException();
                        }
                        HashMap hashMap = new HashMap();
                        this.partitions.forEach(ExtractStatus.rethrowAsRuntime((partition, singlePartitionSubscriber2) -> {
                        }));
                        if (enter2 != null) {
                            if (0 != 0) {
                                try {
                                    enter2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                enter2.close();
                            }
                        }
                        return hashMap;
                    } finally {
                    }
                } catch (TimeoutException e) {
                    return ImmutableMap.of();
                }
            } finally {
            }
        } catch (Throwable th5) {
            throw KafkaExceptionUtils.toKafka(th5);
        }
    }

    @Override // com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer
    public ConsumerRecords<byte[], byte[]> poll(Duration duration) {
        if (this.autocommit) {
            ApiFutures.addCallback(commitAll(), new ApiFutureCallback<Object>() { // from class: com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumerImpl.1
                @Override // com.google.api.core.ApiFutureCallback
                public void onFailure(Throwable th) {
                    ((GoogleLogger.Api) SingleSubscriptionConsumerImpl.logger.atWarning().withCause(th)).log("Failed to commit offsets.");
                }

                @Override // com.google.api.core.ApiFutureCallback
                public void onSuccess(Object obj) {
                }
            }, MoreExecutors.directExecutor());
        }
        Map<Partition, Queue<SequencedMessage>> doPoll = doPoll(duration);
        HashMap hashMap = new HashMap();
        doPoll.forEach((partition, queue) -> {
            if (queue.isEmpty()) {
                return;
            }
            hashMap.put(new TopicPartition(this.topic.toString(), (int) partition.value()), (List) queue.stream().map(sequencedMessage -> {
                return RecordTransforms.fromMessage(sequencedMessage, this.topic, partition);
            }).collect(Collectors.toList()));
        });
        return new ConsumerRecords<>(hashMap);
    }

    @Override // com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer
    public ApiFuture<Map<Partition, Offset>> commitAll() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            ArrayList arrayList = new ArrayList();
            this.partitions.forEach((partition, singlePartitionSubscriber) -> {
                Optional<ApiFuture<Offset>> autoCommit = singlePartitionSubscriber.autoCommit();
                if (autoCommit.isPresent()) {
                    arrayList.add(ApiFutures.transform(autoCommit.get(), offset -> {
                        return new AbstractMap.SimpleEntry(partition, offset);
                    }, MoreExecutors.directExecutor()));
                }
            });
            ApiFuture<Map<Partition, Offset>> transform = ApiFutures.transform(ApiFutures.allAsList(arrayList), list -> {
                return ImmutableMap.copyOf((Map) list.stream().collect(Collectors.toMap(entry -> {
                    return (Partition) entry.getKey();
                }, entry2 -> {
                    return (Offset) entry2.getValue();
                })));
            }, MoreExecutors.directExecutor());
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    enter.close();
                }
            }
            return transform;
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer
    public ApiFuture<Void> commit(Map<Partition, Offset> map) {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                ImmutableList.Builder builder = ImmutableList.builder();
                map.forEach((partition, offset) -> {
                    if (!this.partitions.containsKey(partition)) {
                        throw new CommitFailedException("Tried to commit to partition " + partition.value() + " which is not assigned to this consumer.");
                    }
                    builder.add((ImmutableList.Builder) this.partitions.get(partition).commitOffset(offset));
                });
                ApiFuture<Void> transform = ApiFutures.transform(ApiFutures.allAsList(builder.build()), list -> {
                    return null;
                }, MoreExecutors.directExecutor());
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        enter.close();
                    }
                }
                return transform;
            } finally {
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (th != null) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r10v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r9v2 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException
     */
    /* JADX WARN: Not initialized variable reg: 10, insn: 0x007e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r10 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:27:0x007e */
    /* JADX WARN: Not initialized variable reg: 9, insn: 0x007a: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r9 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:25:0x007a */
    /* JADX WARN: Type inference failed for: r10v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r9v2, types: [com.google.cloud.pubsublite.internal.CloseableMonitor$Hold] */
    @Override // com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer
    public void doSeek(Partition partition, SeekRequest seekRequest) throws KafkaException {
        try {
            try {
                CloseableMonitor.Hold enter = this.monitor.enter();
                Throwable th = null;
                if (!this.partitions.containsKey(partition)) {
                    throw new IllegalStateException("Received seek for partition " + partition.value() + " which is not assigned to this consumer.");
                }
                this.partitions.get(partition).clientSeek(seekRequest);
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        enter.close();
                    }
                }
            } finally {
            }
        } catch (IllegalStateException e) {
            throw e;
        } catch (Throwable th3) {
            throw KafkaExceptionUtils.toKafka(th3);
        }
    }

    @Override // com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer
    public Optional<Long> position(Partition partition) {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            if (this.partitions.containsKey(partition)) {
                Optional<Long> position = this.partitions.get(partition).position();
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        enter.close();
                    }
                }
                return position;
            }
            Optional<Long> empty = Optional.empty();
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    enter.close();
                }
            }
            return empty;
        } catch (Throwable th4) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    enter.close();
                }
            }
            throw th4;
        }
    }

    @Override // com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer
    public void close(Duration duration) {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            Throwable th = null;
            try {
                try {
                    ApiServiceUtils.blockingShutdown(this.partitions.values());
                    if (enter != null) {
                        if (0 != 0) {
                            try {
                                enter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            enter.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            throw KafkaExceptionUtils.toKafka(th3);
        }
    }

    @Override // com.google.cloud.pubsublite.kafka.SingleSubscriptionConsumer
    public void wakeup() {
        this.wakeupTriggered.set(null);
    }
}
