package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Constants;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherImpl;
import com.google.cloud.pubsublite.internal.wire.SerialBatcher;
import com.google.cloud.pubsublite.internal.wire.StreamFactories;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import repackaged.com.google.common.annotations.VisibleForTesting;
import repackaged.com.google.common.base.Preconditions;
import repackaged.com.google.common.flogger.GoogleLogger;
import repackaged.com.google.common.util.concurrent.Monitor;

/* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/PublisherImpl.class */
public final class PublisherImpl extends ProxyService implements Publisher<Offset>, RetryingConnectionObserver<Offset> {
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private final AlarmFactory alarmFactory;
    private final PublishRequest initialRequest;
    private final CloseableMonitor monitor;
    private final Monitor.Guard noneInFlight;

    @GuardedBy("monitor.monitor")
    private Optional<Future<?>> alarmFuture;

    @GuardedBy("monitor.monitor")
    private final RetryingConnection<PublishRequest, BatchPublisher> connection;

    @GuardedBy("monitor.monitor")
    private boolean shutdown;

    @GuardedBy("monitor.monitor")
    private Optional<Offset> lastSentOffset;
    private final CloseableMonitor batcherMonitor;

    @GuardedBy("batcherMonitor.monitor")
    private final SerialBatcher batcher;

    @GuardedBy("monitor.monitor")
    private final Queue<InFlightBatch> batchesInFlight;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/PublisherImpl$InFlightBatch.class */
    public static class InFlightBatch {
        final List<PubSubMessage> messages;
        final List<SettableApiFuture<Offset>> messageFutures;

        InFlightBatch(Collection<SerialBatcher.UnbatchedMessage> collection) {
            this.messages = (List) collection.stream().map((v0) -> {
                return v0.message();
            }).collect(Collectors.toList());
            this.messageFutures = (List) collection.stream().map((v0) -> {
                return v0.future();
            }).collect(Collectors.toList());
        }
    }

    @VisibleForTesting
    PublisherImpl(StreamFactories.PublishStreamFactory publishStreamFactory, BatchPublisherFactory batchPublisherFactory, AlarmFactory alarmFactory, InitialPublishRequest initialPublishRequest, BatchingSettings batchingSettings) throws ApiException {
        super(new ApiService[0]);
        this.monitor = new CloseableMonitor();
        this.noneInFlight = new Monitor.Guard(this.monitor.monitor) { // from class: com.google.cloud.pubsublite.internal.wire.PublisherImpl.1
            @Override // repackaged.com.google.common.util.concurrent.Monitor.Guard
            public boolean isSatisfied() {
                return PublisherImpl.this.batchesInFlight.isEmpty() || PublisherImpl.this.shutdown;
            }
        };
        this.alarmFuture = Optional.empty();
        this.shutdown = false;
        this.lastSentOffset = Optional.empty();
        this.batcherMonitor = new CloseableMonitor();
        this.batchesInFlight = new ArrayDeque();
        this.alarmFactory = alarmFactory;
        Preconditions.checkNotNull(batchingSettings.getRequestByteThreshold());
        Preconditions.checkNotNull(batchingSettings.getElementCountThreshold());
        this.initialRequest = PublishRequest.newBuilder().setInitialRequest(initialPublishRequest).build();
        this.connection = new RetryingConnectionImpl(publishStreamFactory, batchPublisherFactory, this, this.initialRequest);
        this.batcher = new SerialBatcher(batchingSettings.getRequestByteThreshold().longValue(), batchingSettings.getElementCountThreshold().longValue());
        addServices(this.connection);
    }

    public PublisherImpl(StreamFactories.PublishStreamFactory publishStreamFactory, InitialPublishRequest initialPublishRequest, BatchingSettings batchingSettings) throws ApiException {
        this(publishStreamFactory, new BatchPublisherImpl.Factory(), AlarmFactory.create(Duration.ofNanos(((org.threeten.bp.Duration) Objects.requireNonNull(batchingSettings.getDelayThreshold())).toNanos())), initialPublishRequest, batchingSettings);
    }

    @GuardedBy("monitor.monitor")
    private void rebatchForRestart() {
        ArrayDeque<SerialBatcher.UnbatchedMessage> arrayDeque = new ArrayDeque();
        for (InFlightBatch inFlightBatch : this.batchesInFlight) {
            for (int i = 0; i < inFlightBatch.messages.size(); i++) {
                arrayDeque.add(SerialBatcher.UnbatchedMessage.of(inFlightBatch.messages.get(i), inFlightBatch.messageFutures.get(i)));
            }
        }
        logger.atFiner().log("Re-publishing %s messages after reconnection for partition %s", arrayDeque.size(), this.initialRequest.getInitialRequest().getPartition());
        long j = 0;
        int i2 = 0;
        ArrayDeque arrayDeque2 = new ArrayDeque();
        this.batchesInFlight.clear();
        for (SerialBatcher.UnbatchedMessage unbatchedMessage : arrayDeque) {
            long serializedSize = unbatchedMessage.message().getSerializedSize();
            if ((j + serializedSize > Constants.MAX_PUBLISH_BATCH_BYTES || i2 + 1 > 1000) && !arrayDeque2.isEmpty()) {
                this.batchesInFlight.add(new InFlightBatch(arrayDeque2));
                arrayDeque2 = new ArrayDeque();
                i2 = 0;
                j = 0;
            }
            arrayDeque2.add(unbatchedMessage);
            j += serializedSize;
            i2++;
        }
        if (arrayDeque2.isEmpty()) {
            return;
        }
        this.batchesInFlight.add(new InFlightBatch(arrayDeque2));
    }

    @Override // com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver
    public void triggerReinitialize(CheckedApiException checkedApiException) {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            Throwable th = null;
            try {
                try {
                    this.connection.reinitialize(this.initialRequest);
                    rebatchForRestart();
                    Queue<InFlightBatch> queue = this.batchesInFlight;
                    this.connection.modifyConnection(optional -> {
                        if (optional.isPresent()) {
                            queue.forEach(inFlightBatch -> {
                                ((BatchPublisher) optional.get()).publish(inFlightBatch.messages);
                            });
                        }
                    });
                    if (enter != null) {
                        if (0 != 0) {
                            try {
                                enter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            enter.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void handlePermanentError(CheckedApiException checkedApiException) {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            this.shutdown = true;
            this.alarmFuture.ifPresent(future -> {
                future.cancel(false);
            });
            this.alarmFuture = Optional.empty();
            terminateOutstandingPublishes(checkedApiException);
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void start() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            this.alarmFuture = Optional.of(this.alarmFactory.newAlarm(this::flushToStream));
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void stop() {
        flush();
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            this.shutdown = true;
            this.alarmFuture.ifPresent(future -> {
                future.cancel(false);
            });
            this.alarmFuture = Optional.empty();
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    enter.close();
                }
            }
            flush();
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @GuardedBy("monitor.monitor")
    private void terminateOutstandingPublishes(CheckedApiException checkedApiException) {
        this.batchesInFlight.forEach(inFlightBatch -> {
            inFlightBatch.messageFutures.forEach(settableApiFuture -> {
                settableApiFuture.setException(checkedApiException);
            });
        });
        CloseableMonitor.Hold enter = this.batcherMonitor.enter();
        Throwable th = null;
        try {
            try {
                this.batcher.flush().forEach(list -> {
                    list.forEach(unbatchedMessage -> {
                        unbatchedMessage.future().setException(checkedApiException);
                    });
                });
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        enter.close();
                    }
                }
                this.batchesInFlight.clear();
            } finally {
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (th != null) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r10v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r9v1 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException
     */
    /* JADX WARN: Not initialized variable reg: 10, insn: 0x00a9: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r10 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:30:0x00a9 */
    /* JADX WARN: Not initialized variable reg: 9, insn: 0x00a5: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r9 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:28:0x00a5 */
    /* JADX WARN: Type inference failed for: r10v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r9v1, types: [com.google.cloud.pubsublite.internal.CloseableMonitor$Hold] */
    @Override // com.google.cloud.pubsublite.internal.Publisher
    public ApiFuture<Offset> publish(Message message) {
        PubSubMessage proto = message.toProto();
        try {
            try {
                CloseableMonitor.Hold enter = this.batcherMonitor.enter();
                Throwable th = null;
                ApiService.State state = state();
                switch (state) {
                    case FAILED:
                        throw new CheckedApiException("Cannot publish when publisher has failed.", failureCause(), StatusCode.Code.FAILED_PRECONDITION);
                    case STARTING:
                    case RUNNING:
                        ApiFuture<Offset> add = this.batcher.add(proto);
                        if (enter != null) {
                            if (0 != 0) {
                                try {
                                    enter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                enter.close();
                            }
                        }
                        return add;
                    default:
                        throw new CheckedApiException("Cannot publish when Publisher state is " + state.name(), StatusCode.Code.FAILED_PRECONDITION);
                }
            } finally {
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
            return ApiFutures.immediateFailedFuture(e);
        }
        onPermanentError(e);
        return ApiFutures.immediateFailedFuture(e);
    }

    @Override // com.google.cloud.pubsublite.internal.Publisher
    public void cancelOutstandingPublishes() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                terminateOutstandingPublishes(new CheckedApiException("Cancelled by client.", StatusCode.Code.CANCELLED));
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (enter != null) {
                if (th != null) {
                    try {
                        enter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    enter.close();
                }
            }
            throw th4;
        }
    }

    /* JADX WARN: Finally extract failed */
    private void flushToStream() {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            Throwable th = null;
            try {
                if (this.shutdown) {
                    if (enter != null) {
                        if (0 == 0) {
                            enter.close();
                            return;
                        }
                        try {
                            enter.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                CloseableMonitor.Hold enter2 = this.batcherMonitor.enter();
                Throwable th3 = null;
                try {
                    List<List<SerialBatcher.UnbatchedMessage>> flush = this.batcher.flush();
                    if (enter2 != null) {
                        if (0 != 0) {
                            try {
                                enter2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            enter2.close();
                        }
                    }
                    Iterator<List<SerialBatcher.UnbatchedMessage>> it = flush.iterator();
                    while (it.hasNext()) {
                        processBatch(it.next());
                    }
                    if (enter != null) {
                        if (0 != 0) {
                            try {
                                enter.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            enter.close();
                        }
                    }
                    return;
                } catch (Throwable th6) {
                    if (enter2 != null) {
                        if (0 != 0) {
                            try {
                                enter2.close();
                            } catch (Throwable th7) {
                                th3.addSuppressed(th7);
                            }
                        } else {
                            enter2.close();
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        enter.close();
                    }
                }
                throw th8;
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
        }
        onPermanentError(e);
    }

    @GuardedBy("monitor.monitor")
    private void processBatch(Collection<SerialBatcher.UnbatchedMessage> collection) throws CheckedApiException {
        if (collection.isEmpty()) {
            return;
        }
        InFlightBatch inFlightBatch = new InFlightBatch(collection);
        this.batchesInFlight.add(inFlightBatch);
        this.connection.modifyConnection(optional -> {
            CheckedApiPreconditions.checkState(optional.isPresent(), "Published after the stream shut down.");
            ((BatchPublisher) optional.get()).publish(inFlightBatch.messages);
        });
    }

    @Override // java.io.Flushable
    public void flush() {
        flushToStream();
        CloseableMonitor.Hold enterWhenUninterruptibly = this.monitor.enterWhenUninterruptibly(this.noneInFlight);
        Throwable th = null;
        if (enterWhenUninterruptibly != null) {
            if (0 == 0) {
                enterWhenUninterruptibly.close();
                return;
            }
            try {
                enterWhenUninterruptibly.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
        }
    }

    @Override // com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver
    public void onClientResponse(Offset offset) throws CheckedApiException {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            CheckedApiPreconditions.checkState(!this.batchesInFlight.isEmpty(), "Received publish response with no batches in flight.");
            if (this.lastSentOffset.isPresent() && this.lastSentOffset.get().value() >= offset.value()) {
                throw new CheckedApiException(String.format("Received publish response with offset %s that is inconsistent with previous responses max %s", offset, this.lastSentOffset.get()), StatusCode.Code.FAILED_PRECONDITION);
            }
            InFlightBatch remove = this.batchesInFlight.remove();
            this.lastSentOffset = Optional.of(Offset.of((offset.value() + remove.messages.size()) - 1));
            for (int i = 0; i < remove.messageFutures.size(); i++) {
                remove.messageFutures.get(i).set(Offset.of(offset.value() + i));
            }
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }
}
