package org.apache.beam.fn.harness.control;

import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.StringContains;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/control/FinalizeBundleHandlerTest.class */
public class FinalizeBundleHandlerTest {
    private static final String INSTRUCTION_ID = "instructionId";
    private static final BeamFnApi.InstructionResponse SUCCESSFUL_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setFinalizeBundle(BeamFnApi.FinalizeBundleResponse.getDefaultInstance()).m1122build();

    @Test
    public void testRegistrationAndCallback() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        ArrayList arrayList = new ArrayList();
        arrayList.add(FinalizeBundleHandler.CallbackRegistration.create(Instant.now().plus(Duration.standardHours(1L)), () -> {
            atomicBoolean.set(true);
        }));
        arrayList.add(FinalizeBundleHandler.CallbackRegistration.create(Instant.now().plus(Duration.standardHours(1L)), () -> {
            atomicBoolean2.set(true);
        }));
        FinalizeBundleHandler finalizeBundleHandler = new FinalizeBundleHandler(Executors.newCachedThreadPool());
        finalizeBundleHandler.registerCallbacks("test", arrayList);
        Assert.assertEquals(SUCCESSFUL_RESPONSE, finalizeBundleHandler.finalizeBundle(requestFor("test")).m1122build());
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(atomicBoolean2.get());
    }

    @Test
    public void testFinalizationIgnoresMissingBundleIds() throws Exception {
        Assert.assertEquals(SUCCESSFUL_RESPONSE, new FinalizeBundleHandler(Executors.newCachedThreadPool()).finalizeBundle(requestFor("test")).m1122build());
    }

    @Test
    public void testFinalizationContinuesToNextCallbackEvenInFailure() throws Exception {
        ArrayList arrayList = new ArrayList();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        arrayList.add(FinalizeBundleHandler.CallbackRegistration.create(Instant.now().plus(Duration.standardHours(1L)), () -> {
            atomicBoolean.set(true);
            throw new Exception("testException1");
        }));
        arrayList.add(FinalizeBundleHandler.CallbackRegistration.create(Instant.now().plus(Duration.standardHours(1L)), () -> {
            atomicBoolean2.set(true);
            throw new Exception("testException2");
        }));
        FinalizeBundleHandler finalizeBundleHandler = new FinalizeBundleHandler(Executors.newCachedThreadPool());
        finalizeBundleHandler.registerCallbacks("test", arrayList);
        try {
            finalizeBundleHandler.finalizeBundle(requestFor("test"));
            Assert.fail();
        } catch (Exception e) {
            MatcherAssert.assertThat(e.getMessage(), StringContains.containsString("Failed to handle bundle finalization for bundle"));
            Assert.assertEquals(2L, e.getSuppressed().length);
            Assert.assertTrue(atomicBoolean.get());
            Assert.assertTrue(atomicBoolean2.get());
        }
    }

    private static BeamFnApi.InstructionRequest requestFor(String str) {
        return BeamFnApi.InstructionRequest.newBuilder().setInstructionId(INSTRUCTION_ID).setFinalizeBundle(BeamFnApi.FinalizeBundleRequest.newBuilder().setInstructionId(str).m838build()).m1074build();
    }
}
