package org.apache.beam.runners.prism;

import com.google.common.truth.Truth;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/prism/PrismArtifactStagerTest.class */
public class PrismArtifactStagerTest {

    @Rule
    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
    final ArtifactStagingService stagingService = new ArtifactStagingService(new TestDestinationProvider());

    /* loaded from: input_file:org/apache/beam/runners/prism/PrismArtifactStagerTest$TestDestinationProvider.class */
    private static class TestDestinationProvider implements ArtifactStagingService.ArtifactDestinationProvider {
        private TestDestinationProvider() {
        }

        public ArtifactStagingService.ArtifactDestination getDestination(String str, String str2) throws IOException {
            return ArtifactStagingService.ArtifactDestination.create("beam:artifact:type:embedded:v1", ByteString.EMPTY, new ByteArrayOutputStream());
        }

        public void removeStagedArtifacts(String str) throws IOException {
        }
    }

    @Test
    public void givenValidArtifacts_stages() throws IOException, ExecutionException, InterruptedException {
        PrismArtifactStager prismArtifactStager = prismArtifactStager(validArtifacts());
        Truth.assertThat(Boolean.valueOf(prismArtifactStager.getManagedChannel().isShutdown())).isFalse();
        prismArtifactStager.stage();
        Truth.assertThat(this.stagingService.getStagedArtifacts(prismArtifactStager.getStagingSessionToken())).isNotEmpty();
        prismArtifactStager.close();
        Truth.assertThat(Boolean.valueOf(prismArtifactStager.getManagedChannel().isShutdown())).isTrue();
    }

    @Test
    public void givenErrors_performsGracefulCleanup() throws IOException {
        PrismArtifactStager prismArtifactStager = prismArtifactStager(invalidArtifacts());
        Truth.assertThat(Boolean.valueOf(prismArtifactStager.getManagedChannel().isShutdown())).isFalse();
        Objects.requireNonNull(prismArtifactStager);
        Truth.assertThat(((ExecutionException) Assert.assertThrows(ExecutionException.class, prismArtifactStager::stage)).getMessage()).contains("Unexpected artifact type: invalid-type-urn");
        Truth.assertThat(Boolean.valueOf(prismArtifactStager.getManagedChannel().isShutdown())).isFalse();
        prismArtifactStager.close();
        Truth.assertThat(Boolean.valueOf(prismArtifactStager.getManagedChannel().isShutdown())).isTrue();
    }

    private PrismArtifactStager prismArtifactStager(Map<String, List<RunnerApi.ArtifactInformation>> map) throws IOException {
        String generateName = InProcessServerBuilder.generateName();
        ArtifactRetrievalService artifactRetrievalService = new ArtifactRetrievalService();
        this.stagingService.registerJob("staging-token", map);
        this.grpcCleanup.register(InProcessServerBuilder.forName(generateName).directExecutor().addService(this.stagingService).addService(artifactRetrievalService).build().start());
        return PrismArtifactStager.builder().setStagingEndpoint("ignore").setStagingSessionToken("staging-token").setManagedChannel(this.grpcCleanup.register(InProcessChannelBuilder.forName(generateName).build())).build();
    }

    private Map<String, List<RunnerApi.ArtifactInformation>> validArtifacts() {
        return ImmutableMap.of("env1", Collections.singletonList(RunnerApi.ArtifactInformation.newBuilder().setTypeUrn("beam:artifact:type:embedded:v1").setTypePayload(RunnerApi.EmbeddedFilePayload.newBuilder().setData(ByteString.copyFromUtf8("type-payload")).build().toByteString()).setRoleUrn("role-urn").build()));
    }

    private Map<String, List<RunnerApi.ArtifactInformation>> invalidArtifacts() {
        return ImmutableMap.of("env1", Collections.singletonList(RunnerApi.ArtifactInformation.newBuilder().setTypeUrn("invalid-type-urn").setTypePayload(RunnerApi.EmbeddedFilePayload.newBuilder().setData(ByteString.copyFromUtf8("type-payload")).build().toByteString()).setRoleUrn("role-urn").build()));
    }
}
