package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.RoutingPolicy;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatcher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.LongStream;

/* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.class */
public class PartitionCountWatchingPublisher extends ProxyService implements Publisher<MessageMetadata> {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    private final PartitionPublisherFactory publisherFactory;
    private final RoutingPolicy.Factory policyFactory;
    private final CloseableMonitor monitor;

    @GuardedBy("monitor.monitor")
    private boolean shutdown;

    @GuardedBy("monitor.monitor")
    private Optional<PartitionsWithRouting> partitionsWithRouting;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher$PartitionsWithRouting.class */
    public static class PartitionsWithRouting {
        public final ImmutableMap<Partition, Publisher<MessageMetadata>> publishers;
        private final RoutingPolicy routingPolicy;

        private PartitionsWithRouting(ImmutableMap<Partition, Publisher<MessageMetadata>> immutableMap, RoutingPolicy routingPolicy) {
            this.publishers = immutableMap;
            this.routingPolicy = routingPolicy;
        }

        public ApiFuture<MessageMetadata> publish(PubSubMessage pubSubMessage) throws CheckedApiException {
            try {
                Partition route = this.routingPolicy.route(pubSubMessage);
                CheckedApiPreconditions.checkState(this.publishers.containsKey(route), "Routed to partition %s for which there is no publisher available.", route);
                return this.publishers.get(route).publish(pubSubMessage);
            } catch (Throwable th) {
                throw ExtractStatus.toCanonical(th);
            }
        }

        public void cancelOutstandingPublishes() {
            UnmodifiableIterator<Publisher<MessageMetadata>> it = this.publishers.values().iterator();
            while (it.hasNext()) {
                it.next().cancelOutstandingPublishes();
            }
        }

        public void flush() throws IOException {
            UnmodifiableIterator<Publisher<MessageMetadata>> it = this.publishers.values().iterator();
            while (it.hasNext()) {
                it.next().flush();
            }
        }

        public void stop() {
            ApiServiceUtils.blockingShutdown(this.publishers.values());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionCountWatchingPublisher(PartitionPublisherFactory partitionPublisherFactory, RoutingPolicy.Factory factory, PartitionCountWatcher.Factory factory2) {
        super(new ApiService[0]);
        this.monitor = new CloseableMonitor();
        this.shutdown = false;
        this.partitionsWithRouting = Optional.empty();
        this.publisherFactory = partitionPublisherFactory;
        this.policyFactory = factory;
        addServices(factory2.newWatcher((v1) -> {
            handleConfig(v1);
        }), ApiServiceUtils.autoCloseableAsApiService(partitionPublisherFactory));
    }

    @Override // com.google.cloud.pubsublite.internal.Publisher
    public ApiFuture<MessageMetadata> publish(PubSubMessage pubSubMessage) {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            Optional<PartitionsWithRouting> optional = this.partitionsWithRouting;
            if (enter != null) {
                enter.close();
            }
            if (!optional.isPresent()) {
                throw new IllegalStateException("Publish called before start or after shutdown");
            }
            try {
                return optional.get().publish(pubSubMessage);
            } catch (CheckedApiException e) {
                onPermanentError(e);
                return ApiFutures.immediateFailedFuture(e);
            }
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.Publisher
    public void cancelOutstandingPublishes() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            Optional<PartitionsWithRouting> optional = this.partitionsWithRouting;
            if (enter != null) {
                enter.close();
            }
            if (!optional.isPresent()) {
                throw new IllegalStateException("Cancel outstanding publishes called before start or after shutdown");
            }
            optional.get().cancelOutstandingPublishes();
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            Optional<PartitionsWithRouting> optional = this.partitionsWithRouting;
            if (enter != null) {
                enter.close();
            }
            if (!optional.isPresent()) {
                throw new IllegalStateException("Publish called before start or after shutdown");
            }
            optional.get().flush();
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private ImmutableMap<Partition, Publisher<MessageMetadata>> getNewPartitionPublishers(LongStream longStream) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        longStream.forEach(j -> {
            Publisher<MessageMetadata> newPublisher = this.publisherFactory.newPublisher(Partition.of(j));
            newPublisher.addListener(new ApiService.Listener() { // from class: com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisher.1
                @Override // com.google.api.core.ApiService.Listener
                public void failed(ApiService.State state, Throwable th) {
                    PartitionCountWatchingPublisher.this.onPermanentError(ExtractStatus.toCanonical(th));
                }
            }, SystemExecutors.getFuturesExecutor());
            builder.put(Partition.of(j), newPublisher);
            newPublisher.startAsync();
        });
        ImmutableMap<Partition, Publisher<MessageMetadata>> build = builder.build();
        build.values().forEach((v0) -> {
            v0.awaitRunning();
        });
        return build;
    }

    private void handleConfig(long j) {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            if (this.shutdown) {
                if (enter != null) {
                    enter.close();
                    return;
                }
                return;
            }
            Optional<PartitionsWithRouting> optional = this.partitionsWithRouting;
            long intValue = ((Integer) optional.map(partitionsWithRouting -> {
                return Integer.valueOf(partitionsWithRouting.publishers.size());
            }).orElse(0)).intValue();
            if (j == intValue) {
                if (enter != null) {
                    enter.close();
                    return;
                }
                return;
            }
            if (j < intValue) {
                log.atWarning().log("Received an unexpected decrease in partition count. Previous partition count %s, new count %s", intValue, j);
                if (enter != null) {
                    enter.close();
                    return;
                }
                return;
            }
            ImmutableMap.Builder builder = ImmutableMap.builder();
            optional.ifPresent(partitionsWithRouting2 -> {
                ImmutableMap<Partition, Publisher<MessageMetadata>> immutableMap = partitionsWithRouting2.publishers;
                Objects.requireNonNull(builder);
                immutableMap.forEach((v1, v2) -> {
                    r1.put(v1, v2);
                });
            });
            ImmutableMap<Partition, Publisher<MessageMetadata>> newPartitionPublishers = getNewPartitionPublishers(LongStream.range(intValue, j));
            Objects.requireNonNull(builder);
            newPartitionPublishers.forEach((v1, v2) -> {
                r1.put(v1, v2);
            });
            this.partitionsWithRouting = Optional.of(new PartitionsWithRouting(builder.build(), this.policyFactory.newPolicy(j)));
            if (enter != null) {
                enter.close();
            }
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void stop() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            this.shutdown = true;
            Optional<PartitionsWithRouting> optional = this.partitionsWithRouting;
            this.partitionsWithRouting = Optional.empty();
            if (enter != null) {
                enter.close();
            }
            optional.ifPresent((v0) -> {
                v0.stop();
            });
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void handlePermanentError(CheckedApiException checkedApiException) {
        try {
            stop();
        } catch (Exception e) {
            ((GoogleLogger.Api) log.atWarning().withCause(e)).log("Encountered exception while trying to handle failure");
        }
    }
}
