package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.util.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.class */
class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedMessage> {
    private final Duration maxSleepTime;
    private final SubscriptionPartitionProcessorFactory processorFactory;
    private final SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory;
    private final SerializableBiFunction<SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>> trackerFactory;
    private final SerializableFunction<SubscriptionPartition, Committer> committerFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PerSubscriptionPartitionSdf(Duration duration, SerializableFunction<SubscriptionPartition, InitialOffsetReader> serializableFunction, SerializableBiFunction<SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>> serializableBiFunction, SubscriptionPartitionProcessorFactory subscriptionPartitionProcessorFactory, SerializableFunction<SubscriptionPartition, Committer> serializableFunction2) {
        this.maxSleepTime = duration;
        this.processorFactory = subscriptionPartitionProcessorFactory;
        this.offsetReaderFactory = serializableFunction;
        this.trackerFactory = serializableBiFunction;
        this.committerFactory = serializableFunction2;
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkState() {
        return Instant.EPOCH;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimators.MonotonicallyIncreasing newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return new WatermarkEstimators.MonotonicallyIncreasing(instant);
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(RestrictionTracker<OffsetRange, OffsetByteProgress> restrictionTracker, @DoFn.Element SubscriptionPartition subscriptionPartition, DoFn.OutputReceiver<SequencedMessage> outputReceiver) throws Exception {
        SubscriptionPartitionProcessor newProcessor = this.processorFactory.newProcessor(subscriptionPartition, restrictionTracker, outputReceiver);
        try {
            newProcessor.start();
            DoFn.ProcessContinuation waitForCompletion = newProcessor.waitForCompletion(this.maxSleepTime);
            newProcessor.lastClaimed().ifPresent(offset -> {
                Committer committer = (Committer) this.committerFactory.apply(subscriptionPartition);
                committer.startAsync().awaitRunning();
                try {
                    committer.commitOffset(Offset.of(offset.value() + 1)).get();
                    committer.stopAsync().awaitTerminated();
                } catch (ExecutionException e) {
                    throw ExtractStatus.toCanonical((Throwable) Preconditions.checkArgumentNotNull(e.getCause())).underlying;
                } catch (Exception e2) {
                    throw ExtractStatus.toCanonical(e2).underlying;
                }
            });
            if (newProcessor != null) {
                $closeResource(null, newProcessor);
            }
            return waitForCompletion;
        } catch (Throwable th) {
            if (newProcessor != null) {
                $closeResource(null, newProcessor);
            }
            throw th;
        }
    }

    @DoFn.GetInitialRestriction
    public OffsetRange getInitialRestriction(@DoFn.Element SubscriptionPartition subscriptionPartition) {
        InitialOffsetReader initialOffsetReader = (InitialOffsetReader) this.offsetReaderFactory.apply(subscriptionPartition);
        Throwable th = null;
        try {
            try {
                OffsetRange offsetRange = new OffsetRange(initialOffsetReader.read().value(), Long.MAX_VALUE);
                if (initialOffsetReader != null) {
                    $closeResource(null, initialOffsetReader);
                }
                return offsetRange;
            } finally {
            }
        } catch (Throwable th2) {
            if (initialOffsetReader != null) {
                $closeResource(th, initialOffsetReader);
            }
            throw th2;
        }
    }

    @DoFn.NewTracker
    public RestrictionTracker<OffsetRange, OffsetByteProgress> newTracker(@DoFn.Element SubscriptionPartition subscriptionPartition, @DoFn.Restriction OffsetRange offsetRange) {
        return (RestrictionTracker) this.trackerFactory.apply(subscriptionPartition, offsetRange);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
