package org.apache.beam.runners.direct.portable.artifact;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ArtifactServiceStager;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.ServerFactory;
import org.apache.beam.vendor.grpc.v1.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1.io.grpc.stub.StreamObserver;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.class */
public class LocalFileSystemArtifactRetrievalServiceTest {
    private File root;
    private GrpcFnServer<LocalFileSystemArtifactStagerService> stagerServer;
    private GrpcFnServer<LocalFileSystemArtifactRetrievalService> retrievalServer;
    private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub retrievalStub;

    @Rule
    public TemporaryFolder tmp = new TemporaryFolder();
    private ServerFactory serverFactory = InProcessServerFactory.create();

    /* loaded from: input_file:org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest$MultimapChunkAppender.class */
    private static class MultimapChunkAppender implements StreamObserver<ArtifactApi.ArtifactChunk> {
        private final ByteArrayOutputStream target;
        private final CountDownLatch completed;

        private MultimapChunkAppender(ByteArrayOutputStream byteArrayOutputStream, CountDownLatch countDownLatch) {
            this.target = byteArrayOutputStream;
            this.completed = countDownLatch;
        }

        public void onNext(ArtifactApi.ArtifactChunk artifactChunk) {
            try {
                this.target.write(artifactChunk.getData().toByteArray());
            } catch (IOException e) {
                throw new AssertionError(e);
            }
        }

        public void onError(Throwable th) {
            this.completed.countDown();
        }

        public void onCompleted() {
            this.completed.countDown();
        }
    }

    @Before
    public void setup() throws Exception {
        this.root = this.tmp.newFolder();
        this.stagerServer = GrpcFnServer.allocatePortAndCreateFor(LocalFileSystemArtifactStagerService.forRootDirectory(this.root), this.serverFactory);
    }

    @After
    public void teardown() throws Exception {
        this.stagerServer.close();
        this.retrievalServer.close();
    }

    @Test
    public void retrieveManifest() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("foo", "bar, baz, quux".getBytes(StandardCharsets.UTF_8));
        hashMap.put("spam", new byte[]{Byte.MAX_VALUE, -22, 5});
        stageAndCreateRetrievalService(hashMap);
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.retrievalStub.getManifest(ArtifactApi.GetManifestRequest.getDefaultInstance(), new StreamObserver<ArtifactApi.GetManifestResponse>() { // from class: org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactRetrievalServiceTest.1
            public void onNext(ArtifactApi.GetManifestResponse getManifestResponse) {
                atomicReference.set(getManifestResponse.getManifest());
            }

            public void onError(Throwable th) {
                countDownLatch.countDown();
            }

            public void onCompleted() {
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        Assert.assertThat((ArtifactApi.Manifest) atomicReference.get(), Matchers.not(Matchers.nullValue()));
        ArrayList arrayList = new ArrayList();
        Iterator it = ((ArtifactApi.Manifest) atomicReference.get()).getArtifactList().iterator();
        while (it.hasNext()) {
            arrayList.add(((ArtifactApi.ArtifactMetadata) it.next()).getName());
        }
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new String[]{"foo", "spam"}));
    }

    @Test
    public void retrieveArtifact() throws Exception {
        HashMap hashMap = new HashMap();
        byte[] bytes = "bar, baz, quux".getBytes(StandardCharsets.UTF_8);
        hashMap.put("foo", bytes);
        byte[] bArr = {Byte.MAX_VALUE, -22, 5};
        hashMap.put("spam", bArr);
        stageAndCreateRetrievalService(hashMap);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        this.retrievalStub.getArtifact(ArtifactApi.GetArtifactRequest.newBuilder().setName("foo").build(), new MultimapChunkAppender(byteArrayOutputStream, countDownLatch));
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        this.retrievalStub.getArtifact(ArtifactApi.GetArtifactRequest.newBuilder().setName("spam").build(), new MultimapChunkAppender(byteArrayOutputStream2, countDownLatch));
        countDownLatch.await();
        Assert.assertArrayEquals(bytes, byteArrayOutputStream.toByteArray());
        Assert.assertArrayEquals(bArr, byteArrayOutputStream2.toByteArray());
    }

    @Test
    public void retrieveArtifactNotPresent() throws Exception {
        stageAndCreateRetrievalService(Collections.singletonMap("foo", "bar, baz, quux".getBytes(StandardCharsets.UTF_8)));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        this.retrievalStub.getArtifact(ArtifactApi.GetArtifactRequest.newBuilder().setName("spam").build(), new StreamObserver<ArtifactApi.ArtifactChunk>() { // from class: org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactRetrievalServiceTest.2
            public void onNext(ArtifactApi.ArtifactChunk artifactChunk) {
                Assert.fail("Should never receive an " + ArtifactApi.ArtifactChunk.class.getSimpleName() + " for a nonexistent artifact");
            }

            public void onError(Throwable th) {
                atomicReference.set(th);
                countDownLatch.countDown();
            }

            public void onCompleted() {
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        Assert.assertThat((Throwable) atomicReference.get(), Matchers.not(Matchers.nullValue()));
        Assert.assertThat(((Throwable) atomicReference.get()).getMessage(), Matchers.containsString("No such artifact"));
        Assert.assertThat(((Throwable) atomicReference.get()).getMessage(), Matchers.containsString("spam"));
    }

    private void stageAndCreateRetrievalService(Map<String, byte[]> map) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, byte[]> entry : map.entrySet()) {
            File newFile = this.tmp.newFile(entry.getKey());
            Files.write(newFile.toPath(), entry.getValue(), new OpenOption[0]);
            arrayList.add(ArtifactServiceStager.StagedFile.of(newFile, newFile.getName()));
        }
        ArtifactServiceStager.overChannel(InProcessChannelBuilder.forName(this.stagerServer.getApiServiceDescriptor().getUrl()).build()).stage("token", arrayList);
        this.retrievalServer = GrpcFnServer.allocatePortAndCreateFor(LocalFileSystemArtifactRetrievalService.forRootDirectory(this.root), this.serverFactory);
        this.retrievalStub = ArtifactRetrievalServiceGrpc.newStub(InProcessChannelBuilder.forName(this.retrievalServer.getApiServiceDescriptor().getUrl()).build());
    }
}
