package org.apache.beam.runners.samza.runtime;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/BundleManager.class */
public class BundleManager<OutT> {
    private static final Logger LOG = LoggerFactory.getLogger(BundleManager.class);
    private static final long MIN_BUNDLE_CHECK_TIME_MS = 10;
    private final long maxBundleSize;
    private final long maxBundleTimeMs;
    private final BundleProgressListener<OutT> bundleProgressListener;
    private final FutureCollector<OutT> futureCollector;
    private final Scheduler<KeyedTimerData<Void>> bundleTimerScheduler;
    private final String bundleCheckTimerId;
    private transient AtomicLong currentBundleElementCount;
    private transient AtomicLong pendingBundleCount;
    private transient AtomicLong bundleStartTime;
    private transient AtomicBoolean isBundleStarted;
    private transient Instant bundleWatermarkHold;
    private transient AtomicReference<CompletableFuture<Void>> currentActiveBundleDoneFutureReference;
    private transient CompletionStage<Void> watermarkFuture;

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/BundleManager$BundleProgressListener.class */
    public interface BundleProgressListener<OutT> {
        void onBundleStarted();

        void onBundleFinished(OpEmitter<OutT> opEmitter);

        void onWatermark(Instant instant, OpEmitter<OutT> opEmitter);
    }

    public BundleManager(BundleProgressListener<OutT> bundleProgressListener, FutureCollector<OutT> futureCollector, long j, long j2, Scheduler<KeyedTimerData<Void>> scheduler, String str) {
        this.maxBundleSize = j;
        this.maxBundleTimeMs = j2;
        this.bundleProgressListener = bundleProgressListener;
        this.bundleTimerScheduler = scheduler;
        this.bundleCheckTimerId = str;
        this.futureCollector = futureCollector;
        if (j > 1) {
            scheduleNextBundleCheck();
        }
        this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
        this.currentActiveBundleDoneFutureReference = new AtomicReference<>();
        this.currentBundleElementCount = new AtomicLong(0L);
        this.isBundleStarted = new AtomicBoolean(false);
        this.pendingBundleCount = new AtomicLong(0L);
        this.watermarkFuture = CompletableFuture.completedFuture(null);
    }

    private void scheduleNextBundleCheck() {
        Instant plus = Instant.now().plus(Duration.millis((this.maxBundleTimeMs / 2) + MIN_BUNDLE_CHECK_TIME_MS));
        this.bundleTimerScheduler.schedule(new KeyedTimerData(new byte[0], null, TimerInternals.TimerData.of(this.bundleCheckTimerId, StateNamespaces.global(), plus, plus, TimeDomain.PROCESSING_TIME)), plus.getMillis());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tryStartBundle() {
        this.futureCollector.prepare();
        if (this.isBundleStarted.compareAndSet(false, true)) {
            LOG.debug("Starting a new bundle.");
            Preconditions.checkArgument(this.currentActiveBundleDoneFutureReference.get() == null, "Current active bundle done future should be null before starting a new bundle.");
            this.bundleStartTime.set(System.currentTimeMillis());
            this.pendingBundleCount.incrementAndGet();
            this.bundleProgressListener.onBundleStarted();
        }
        this.currentBundleElementCount.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processWatermark(Instant instant, OpEmitter<OutT> opEmitter) {
        if (!isBundleStarted() && this.pendingBundleCount.get() == 0) {
            LOG.debug("Propagating watermark: {} directly since no bundle in progress.", instant);
            this.bundleProgressListener.onWatermark(instant, opEmitter);
            return;
        }
        this.bundleWatermarkHold = instant;
        if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(instant)) {
            if (isBundleStarted()) {
                LOG.info("Received max watermark. Triggering finish bundle before flushing the watermark downstream.");
                tryFinishBundle(opEmitter);
                this.watermarkFuture.toCompletableFuture().join();
            } else {
                LOG.info("Received max watermark. Waiting for previous bundles to complete before flushing the watermark downstream.");
                this.watermarkFuture.toCompletableFuture().join();
                this.bundleProgressListener.onWatermark(instant, opEmitter);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> opEmitter) {
        if (this.bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
            tryFinishBundle(opEmitter);
            scheduleNextBundleCheck();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void signalFailure(Throwable th) {
        LOG.error("Encountered error during processing the message. Discarding the output due to: ", th);
        this.futureCollector.discard();
        this.isBundleStarted.compareAndSet(true, false);
        if (this.bundleStartTime.get() != Long.MAX_VALUE) {
            this.currentBundleElementCount.set(0L);
            this.bundleStartTime.set(Long.MAX_VALUE);
            this.pendingBundleCount.decrementAndGet();
            this.currentActiveBundleDoneFutureReference.set(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void tryFinishBundle(OpEmitter<OutT> opEmitter) {
        CompletionStage finish = this.futureCollector.finish();
        if (shouldFinishBundle() && this.isBundleStarted.compareAndSet(true, false)) {
            LOG.debug("Finishing the current bundle.");
            this.currentBundleElementCount.set(0L);
            this.bundleStartTime.set(Long.MAX_VALUE);
            Instant instant = this.bundleWatermarkHold;
            this.bundleWatermarkHold = null;
            CompletableFuture<Void> completableFuture = this.currentActiveBundleDoneFutureReference.get();
            finish = finish.thenCombine(completableFuture != null ? completableFuture : CompletableFuture.completedFuture(null), (collection, r6) -> {
                this.bundleProgressListener.onBundleFinished(opEmitter);
                return collection;
            });
            this.watermarkFuture = finish.thenAcceptBoth(this.watermarkFuture, instant == null ? (collection2, r5) -> {
                this.pendingBundleCount.decrementAndGet();
            } : (collection3, r8) -> {
                LOG.debug("Propagating watermark: {} to downstream.", instant);
                this.bundleProgressListener.onWatermark(instant, opEmitter);
                this.pendingBundleCount.decrementAndGet();
            });
            this.currentActiveBundleDoneFutureReference.set(null);
        } else if (this.isBundleStarted.get()) {
            CompletableFuture<Collection<WindowedValue<OutT>>> completableFuture2 = finish.toCompletableFuture();
            this.currentActiveBundleDoneFutureReference.updateAndGet(completableFuture3 -> {
                return CompletableFuture.allOf(completableFuture3 != null ? completableFuture3 : CompletableFuture.completedFuture(null), completableFuture2);
            });
        }
        opEmitter.emitFuture(finish);
    }

    @VisibleForTesting
    long getCurrentBundleElementCount() {
        return this.currentBundleElementCount.longValue();
    }

    @Nullable
    @VisibleForTesting
    CompletionStage<Void> getCurrentBundleDoneFuture() {
        return this.currentActiveBundleDoneFutureReference.get();
    }

    @VisibleForTesting
    void setCurrentBundleDoneFuture(CompletableFuture<Void> completableFuture) {
        this.currentActiveBundleDoneFutureReference.set(completableFuture);
    }

    @VisibleForTesting
    long getPendingBundleCount() {
        return this.pendingBundleCount.longValue();
    }

    @VisibleForTesting
    void setPendingBundleCount(long j) {
        this.pendingBundleCount.set(j);
    }

    @VisibleForTesting
    boolean isBundleStarted() {
        return this.isBundleStarted.get();
    }

    @VisibleForTesting
    void setBundleWatermarkHold(Instant instant) {
        this.bundleWatermarkHold = instant;
    }

    private boolean shouldFinishBundle() {
        return this.isBundleStarted.get() && (this.currentBundleElementCount.get() >= this.maxBundleSize || System.currentTimeMillis() - this.bundleStartTime.get() >= this.maxBundleTimeMs || BoundedWindow.TIMESTAMP_MAX_VALUE.equals(this.bundleWatermarkHold));
    }
}
