package org.apache.beam.artifact.local;

import com.google.protobuf.ByteString;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ServerImpl;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.local.artifact.service.java.repackaged.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest.class */
public class LocalFileSystemArtifactStagerServiceTest {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stub;
    private LocalFileSystemArtifactStagerService stager;
    private ServerImpl server;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest$RecordingStreamObserver.class */
    public static class RecordingStreamObserver<T> implements StreamObserver<T> {
        private List<T> responses;

        @Nullable
        private Throwable error;
        private boolean completed;

        private RecordingStreamObserver() {
            this.responses = new ArrayList();
            this.error = null;
            this.completed = false;
        }

        public void onNext(T t) {
            failIfTerminal();
            this.responses.add(t);
        }

        public void onError(Throwable th) {
            failIfTerminal();
            this.error = th;
        }

        public void onCompleted() {
            failIfTerminal();
            this.completed = true;
        }

        private boolean isTerminal() {
            return this.error != null || this.completed;
        }

        private void failIfTerminal() {
            if (isTerminal()) {
                Assert.fail(String.format("Should have terminated after entering a terminal state: completed %s, error %s", Boolean.valueOf(this.completed), this.error));
            }
        }

        void awaitTerminalState() {
            while (!isTerminal()) {
                Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.MILLISECONDS);
            }
        }
    }

    @Before
    public void setup() throws Exception {
        this.stager = LocalFileSystemArtifactStagerService.withRootDirectory(this.temporaryFolder.newFolder());
        this.server = InProcessServerBuilder.forName("fs_stager").directExecutor().addService(this.stager).build().start();
        this.stub = ArtifactStagingServiceGrpc.newStub(InProcessChannelBuilder.forName("fs_stager").usePlaintext(true).build());
    }

    @After
    public void teardown() {
        this.server.shutdownNow();
    }

    @Test
    public void singleDataPutArtifactSucceeds() throws Exception {
        byte[] bytes = "foo-bar-baz".getBytes();
        RecordingStreamObserver recordingStreamObserver = new RecordingStreamObserver();
        StreamObserver putArtifact = this.stub.putArtifact(recordingStreamObserver);
        putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setMetadata(ArtifactApi.ArtifactMetadata.newBuilder().setName("my-artifact").build()).build());
        putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bytes)).build()).build());
        putArtifact.onCompleted();
        recordingStreamObserver.awaitTerminalState();
        File artifactFile = this.stager.getArtifactFile("my-artifact");
        Assert.assertThat(Boolean.valueOf(artifactFile.exists()), Matchers.is(true));
        ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
        new FileInputStream(artifactFile).getChannel().read(allocate);
        Assert.assertArrayEquals(bytes, allocate.array());
    }

    @Test
    public void multiPartPutArtifactSucceeds() throws Exception {
        byte[] bytes = "foo-".getBytes();
        byte[] bytes2 = "bar-".getBytes();
        byte[] bytes3 = "baz".getBytes();
        RecordingStreamObserver recordingStreamObserver = new RecordingStreamObserver();
        StreamObserver putArtifact = this.stub.putArtifact(recordingStreamObserver);
        putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setMetadata(ArtifactApi.ArtifactMetadata.newBuilder().setName("my-artifact").build()).build());
        putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bytes)).build()).build());
        putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bytes2)).build()).build());
        putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bytes3)).build()).build());
        putArtifact.onCompleted();
        recordingStreamObserver.awaitTerminalState();
        File artifactFile = this.stager.getArtifactFile("my-artifact");
        Assert.assertThat(Boolean.valueOf(artifactFile.exists()), Matchers.is(true));
        ByteBuffer allocate = ByteBuffer.allocate("foo-bar-baz".length());
        new FileInputStream(artifactFile).getChannel().read(allocate);
        Assert.assertArrayEquals("foo-bar-baz".getBytes(), allocate.array());
    }

    @Test
    public void putArtifactBeforeNameFails() {
        byte[] bytes = "foo-".getBytes();
        RecordingStreamObserver recordingStreamObserver = new RecordingStreamObserver();
        this.stub.putArtifact(recordingStreamObserver).onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bytes)).build()).build());
        recordingStreamObserver.awaitTerminalState();
        Assert.assertThat(recordingStreamObserver.error, Matchers.not(Matchers.nullValue()));
    }

    @Test
    public void putArtifactWithNoContentFails() {
        RecordingStreamObserver recordingStreamObserver = new RecordingStreamObserver();
        this.stub.putArtifact(recordingStreamObserver).onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.getDefaultInstance()).build());
        recordingStreamObserver.awaitTerminalState();
        Assert.assertThat(recordingStreamObserver.error, Matchers.not(Matchers.nullValue()));
    }

    @Test
    public void commitManifestWithAllArtifactsSucceeds() {
        ArtifactApi.ArtifactMetadata stageBytes = stageBytes("first-artifact", "foo, bar, baz, quux".getBytes());
        ArtifactApi.Manifest build = ArtifactApi.Manifest.newBuilder().addArtifact(stageBytes).addArtifact(stageBytes("second-artifact", "spam, ham, eggs".getBytes())).build();
        RecordingStreamObserver recordingStreamObserver = new RecordingStreamObserver();
        this.stub.commitManifest(ArtifactApi.CommitManifestRequest.newBuilder().setManifest(build).build(), recordingStreamObserver);
        recordingStreamObserver.awaitTerminalState();
        Assert.assertThat(Boolean.valueOf(recordingStreamObserver.completed), Matchers.is(true));
        Assert.assertThat(recordingStreamObserver.responses, Matchers.hasSize(1));
        Assert.assertThat(((ArtifactApi.CommitManifestResponse) recordingStreamObserver.responses.get(0)).getStagingToken(), Matchers.not(Matchers.nullValue()));
    }

    @Test
    public void commitManifestWithMissingArtifactFails() {
        ArtifactApi.ArtifactMetadata stageBytes = stageBytes("first-artifact", "foo, bar, baz, quux".getBytes());
        ArtifactApi.Manifest build = ArtifactApi.Manifest.newBuilder().addArtifact(stageBytes).addArtifact(ArtifactApi.ArtifactMetadata.newBuilder().setName("absent").build()).build();
        RecordingStreamObserver recordingStreamObserver = new RecordingStreamObserver();
        this.stub.commitManifest(ArtifactApi.CommitManifestRequest.newBuilder().setManifest(build).build(), recordingStreamObserver);
        recordingStreamObserver.awaitTerminalState();
        Assert.assertThat(recordingStreamObserver.error, Matchers.not(Matchers.nullValue()));
    }

    private ArtifactApi.ArtifactMetadata stageBytes(String str, byte[] bArr) {
        StreamObserver putArtifact = this.stub.putArtifact(new RecordingStreamObserver());
        putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setMetadata(ArtifactApi.ArtifactMetadata.newBuilder().setName(str).build()).build());
        putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bArr)).build()).build());
        putArtifact.onCompleted();
        return ArtifactApi.ArtifactMetadata.newBuilder().setName(str).build();
    }
}
