package org.apache.beam.runners.samza.runtime;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.samza.runtime.BundleManager;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/BundleManagerTest.class */
public final class BundleManagerTest {
    private static final long MAX_BUNDLE_SIZE = 3;
    private static final long MAX_BUNDLE_TIME_MS = 2000;
    private static final String BUNDLE_CHECK_TIMER_ID = "bundle-check-test-timer";
    private FutureCollector<String> mockFutureCollector;
    private BundleManager<String> bundleManager;
    private BundleManager.BundleProgressListener<String> bundleProgressListener;
    private Scheduler<KeyedTimerData<Void>> mockScheduler;

    @Before
    public void setUp() {
        this.mockFutureCollector = (FutureCollector) Mockito.mock(FutureCollector.class);
        this.bundleProgressListener = (BundleManager.BundleProgressListener) Mockito.mock(BundleManager.BundleProgressListener.class);
        this.mockScheduler = (Scheduler) Mockito.mock(Scheduler.class);
        this.bundleManager = new BundleManager<>(this.bundleProgressListener, this.mockFutureCollector, MAX_BUNDLE_SIZE, MAX_BUNDLE_TIME_MS, this.mockScheduler, BUNDLE_CHECK_TIMER_ID);
    }

    @Test
    public void testTryStartBundleStartsBundle() {
        this.bundleManager.tryStartBundle();
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onBundleStarted();
        Assert.assertEquals("Expected the number of element in the current bundle to be 1", 1L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected the pending bundle count to be 1", 1L, this.bundleManager.getPendingBundleCount());
        Assert.assertTrue("tryStartBundle() did not start the bundle", this.bundleManager.isBundleStarted());
    }

    @Test
    public void testTryStartBundleThrowsExceptionAndSignalError() {
        this.bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null));
        try {
            this.bundleManager.tryStartBundle();
        } catch (IllegalArgumentException e) {
            this.bundleManager.signalFailure(e);
        }
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(1))).prepare();
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(1))).discard();
        Assert.assertEquals("Expected the number of element in the current bundle to 0", 0L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected pending bundle count to be 0", 0L, this.bundleManager.getPendingBundleCount());
        Assert.assertFalse("Error didn't reset the bundle as expected.", this.bundleManager.isBundleStarted());
    }

    @Test
    public void testTryStartBundleThrowsExceptionFromTheListener() {
        ((BundleManager.BundleProgressListener) Mockito.doThrow(new Throwable[]{new RuntimeException("User start bundle threw an exception")}).when(this.bundleProgressListener)).onBundleStarted();
        try {
            this.bundleManager.tryStartBundle();
        } catch (RuntimeException e) {
            this.bundleManager.signalFailure(e);
        }
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(1))).prepare();
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(1))).discard();
        Assert.assertEquals("Expected the number of element in the current bundle to 0", 0L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected pending bundle count to be 0", 0L, this.bundleManager.getPendingBundleCount());
        Assert.assertFalse("Error didn't reset the bundle as expected.", this.bundleManager.isBundleStarted());
    }

    @Test
    public void testMultipleStartBundle() {
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryStartBundle();
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onBundleStarted();
        Assert.assertEquals("Expected the number of element in the current bundle to be 2", 2L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected the pending bundle count to be 1", 1L, this.bundleManager.getPendingBundleCount());
        Assert.assertTrue("tryStartBundle() did not start the bundle", this.bundleManager.isBundleStarted());
    }

    @Test
    public void testTryFinishBundleClosesBundle() {
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        Mockito.when(this.mockFutureCollector.finish()).thenReturn(CompletableFuture.completedFuture(Collections.singleton((WindowedValue) Mockito.mock(WindowedValue.class))));
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryFinishBundle(opEmitter);
        ((OpEmitter) Mockito.verify(opEmitter, Mockito.times(1))).emitFuture((CompletionStage) Matchers.anyObject());
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onBundleFinished(opEmitter);
        Assert.assertEquals("Expected the number of element in the current bundle to be 0", 0L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected the pending bundle count to be 0", 0L, this.bundleManager.getPendingBundleCount());
        Assert.assertFalse("tryFinishBundle() did not close the bundle", this.bundleManager.isBundleStarted());
    }

    @Test
    public void testTryFinishBundleClosesBundleOnMaxWatermark() {
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        Mockito.when(this.mockFutureCollector.finish()).thenReturn(CompletableFuture.completedFuture(Collections.singleton((WindowedValue) Mockito.mock(WindowedValue.class))));
        this.bundleManager.setBundleWatermarkHold(BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryFinishBundle(opEmitter);
        ((OpEmitter) Mockito.verify(opEmitter, Mockito.times(1))).emitFuture((CompletionStage) Matchers.anyObject());
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onBundleFinished(opEmitter);
        Assert.assertEquals("Expected the number of element in the current bundle to be 0", 0L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected the pending bundle count to be 0", 0L, this.bundleManager.getPendingBundleCount());
        Assert.assertFalse("tryFinishBundle() did not close the bundle", this.bundleManager.isBundleStarted());
    }

    @Test
    public void testTryFinishBundleShouldNotCloseBundle() {
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        Mockito.when(this.mockFutureCollector.finish()).thenReturn(CompletableFuture.completedFuture(Collections.singleton((WindowedValue) Mockito.mock(WindowedValue.class))));
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryFinishBundle(opEmitter);
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(1))).finish();
        ((OpEmitter) Mockito.verify(opEmitter, Mockito.times(1))).emitFuture((CompletionStage) Matchers.anyObject());
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(0))).onBundleFinished(opEmitter);
        Assert.assertEquals("Expected the number of element in the current bundle to be 1", 1L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected the pending bundle count to be 1", 1L, this.bundleManager.getPendingBundleCount());
        Assert.assertTrue("tryFinishBundle() did not close the bundle", this.bundleManager.isBundleStarted());
    }

    @Test
    public void testTryFinishBundleWhenNoBundleInProgress() {
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        Mockito.when(this.mockFutureCollector.finish()).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
        this.bundleManager.tryFinishBundle(opEmitter);
        ((OpEmitter) Mockito.verify(opEmitter, Mockito.times(1))).emitFuture((CompletionStage) Matchers.anyObject());
        Assert.assertNull("tryFinishBundle() should not set the future when no bundle in progress", this.bundleManager.getCurrentBundleDoneFuture());
    }

    @Test
    public void testProcessWatermarkWhenNoBundleInProgress() {
        Instant now = Instant.now();
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        this.bundleManager.processWatermark(now, opEmitter);
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onWatermark(now, opEmitter);
    }

    @Test
    public void testProcessWatermarkWithPendingBundles() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Instant now = Instant.now();
        OpEmitter<String> opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        ArgumentCaptor<CompletionStage<Collection<WindowedValue<String>>>> forClass = ArgumentCaptor.forClass(CompletionStage.class);
        Mockito.when(this.mockFutureCollector.finish()).thenReturn(CompletableFuture.supplyAsync(() -> {
            try {
                countDownLatch.await();
                return Collections.singleton((WindowedValue) Mockito.mock(WindowedValue.class));
            } catch (InterruptedException e) {
                throw new AssertionError("Test interrupted when waiting for latch");
            }
        }));
        testWatermarkHoldWhenPendingBundleInProgress(opEmitter, forClass, now);
        testWatermarkHoldPropagatesAfterFutureResolution(opEmitter, forClass, countDownLatch, now);
    }

    @Test
    public void testMaxWatermarkPropagationForPendingBundle() {
        Instant instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        this.bundleManager.setPendingBundleCount(1L);
        this.bundleManager.processWatermark(instant, opEmitter);
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onWatermark(instant, opEmitter);
    }

    @Test
    public void testMaxWatermarkWithBundleInProgress() {
        Instant instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        Mockito.when(this.mockFutureCollector.finish()).thenReturn(CompletableFuture.completedFuture(Collections.singleton((WindowedValue) Mockito.mock(WindowedValue.class))));
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryStartBundle();
        this.bundleManager.processWatermark(instant, opEmitter);
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onWatermark(instant, opEmitter);
    }

    @Test
    public void testProcessTimerWithBundleTimeElapsed() {
        BundleManager bundleManager = new BundleManager(this.bundleProgressListener, this.mockFutureCollector, MAX_BUNDLE_SIZE, 0L, this.mockScheduler, BUNDLE_CHECK_TIMER_ID);
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        KeyedTimerData keyedTimerData = (KeyedTimerData) Mockito.mock(KeyedTimerData.class);
        TimerInternals.TimerData timerData = (TimerInternals.TimerData) Mockito.mock(TimerInternals.TimerData.class);
        Mockito.when(this.mockFutureCollector.finish()).thenReturn(CompletableFuture.completedFuture(Collections.singleton((WindowedValue) Mockito.mock(WindowedValue.class))));
        Mockito.when(timerData.getTimerId()).thenReturn(BUNDLE_CHECK_TIMER_ID);
        Mockito.when(keyedTimerData.getTimerData()).thenReturn(timerData);
        bundleManager.tryStartBundle();
        bundleManager.processTimer(keyedTimerData, opEmitter);
        ((OpEmitter) Mockito.verify(opEmitter, Mockito.times(1))).emitFuture((CompletionStage) Matchers.anyObject());
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onBundleFinished(opEmitter);
        Assert.assertEquals("Expected the number of element in the current bundle to be 0", 0L, bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected the pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
        Assert.assertFalse("tryFinishBundle() did not close the bundle", bundleManager.isBundleStarted());
    }

    @Test
    public void testProcessTimerWithTimeLessThanMaxBundleTime() {
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        KeyedTimerData keyedTimerData = (KeyedTimerData) Mockito.mock(KeyedTimerData.class);
        TimerInternals.TimerData timerData = (TimerInternals.TimerData) Mockito.mock(TimerInternals.TimerData.class);
        Mockito.when(timerData.getTimerId()).thenReturn(BUNDLE_CHECK_TIMER_ID);
        Mockito.when(keyedTimerData.getTimerData()).thenReturn(timerData);
        Mockito.when(this.mockFutureCollector.finish()).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
        this.bundleManager.tryStartBundle();
        this.bundleManager.processTimer(keyedTimerData, opEmitter);
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(1))).finish();
        ((OpEmitter) Mockito.verify(opEmitter, Mockito.times(1))).emitFuture((CompletionStage) Matchers.anyObject());
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(0))).onBundleFinished(opEmitter);
        Assert.assertEquals("Expected the number of element in the current bundle to be 1", 1L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected the pending bundle count to be 1", 1L, this.bundleManager.getPendingBundleCount());
        Assert.assertTrue("tryFinishBundle() closed the bundle", this.bundleManager.isBundleStarted());
    }

    @Test
    public void testProcessTimerIgnoresNonBundleTimers() {
        OpEmitter opEmitter = (OpEmitter) Mockito.mock(OpEmitter.class);
        KeyedTimerData keyedTimerData = (KeyedTimerData) Mockito.mock(KeyedTimerData.class);
        TimerInternals.TimerData timerData = (TimerInternals.TimerData) Mockito.mock(TimerInternals.TimerData.class);
        Mockito.when(timerData.getTimerId()).thenReturn("NotBundleTimer");
        Mockito.when(keyedTimerData.getTimerData()).thenReturn(timerData);
        this.bundleManager.tryStartBundle();
        this.bundleManager.processTimer(keyedTimerData, opEmitter);
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(0))).finish();
        ((OpEmitter) Mockito.verify(opEmitter, Mockito.times(0))).emitFuture((CompletionStage) Matchers.anyObject());
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(0))).onBundleFinished(opEmitter);
        Assert.assertEquals("Expected the number of element in the current bundle to be 1", 1L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected the pending bundle count to be 1", 1L, this.bundleManager.getPendingBundleCount());
        Assert.assertTrue("tryFinishBundle() closed the bundle", this.bundleManager.isBundleStarted());
    }

    @Test
    public void testSignalFailureResetsTheBundleAndCollector() {
        this.bundleManager.tryStartBundle();
        this.bundleManager.signalFailure((Throwable) Mockito.mock(Throwable.class));
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(1))).prepare();
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(1))).discard();
        Assert.assertEquals("Expected the number of element in the current bundle to 0", 0L, this.bundleManager.getCurrentBundleElementCount());
        Assert.assertEquals("Expected pending bundle count to be 0", 0L, this.bundleManager.getPendingBundleCount());
        Assert.assertFalse("Error didn't reset the bundle as expected.", this.bundleManager.isBundleStarted());
    }

    private void testWatermarkHoldPropagatesAfterFutureResolution(OpEmitter<String> opEmitter, ArgumentCaptor<CompletionStage<Collection<WindowedValue<String>>>> argumentCaptor, CountDownLatch countDownLatch, Instant instant) {
        Instant now = Instant.now();
        this.bundleManager.processWatermark(now, opEmitter);
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(0))).onWatermark(now, opEmitter);
        countDownLatch.countDown();
        ((CompletionStage) argumentCaptor.getValue()).thenAccept(collection -> {
            ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onBundleFinished(opEmitter);
            ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(1))).onWatermark(instant, opEmitter);
            Assert.assertEquals("Expected the pending bundle count to be 0", 0L, this.bundleManager.getPendingBundleCount());
        }).toCompletableFuture().join();
    }

    private void testWatermarkHoldWhenPendingBundleInProgress(OpEmitter<String> opEmitter, ArgumentCaptor<CompletionStage<Collection<WindowedValue<String>>>> argumentCaptor, Instant instant) {
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryStartBundle();
        this.bundleManager.tryStartBundle();
        this.bundleManager.processWatermark(instant, opEmitter);
        ((BundleManager.BundleProgressListener) Mockito.verify(this.bundleProgressListener, Mockito.times(0))).onWatermark(instant, opEmitter);
        this.bundleManager.tryFinishBundle(opEmitter);
        ((FutureCollector) Mockito.verify(this.mockFutureCollector, Mockito.times(1))).finish();
        ((OpEmitter) Mockito.verify(opEmitter, Mockito.times(1))).emitFuture((CompletionStage) argumentCaptor.capture());
        Assert.assertFalse("tryFinishBundle() closed the bundle", this.bundleManager.isBundleStarted());
    }
}
