package org.apache.beam.runners.fnexecution.artifact;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.class */
public class BeamFileSystemArtifactServicesTest {
    private static final int DATA_1KB = 1024;
    private GrpcFnServer<BeamFileSystemArtifactStagingService> stagingServer;
    private BeamFileSystemArtifactStagingService stagingService;
    private GrpcFnServer<BeamFileSystemArtifactRetrievalService> retrievalServer;
    private BeamFileSystemArtifactRetrievalService retrievalService;
    private ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingStub;
    private ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub stagingBlockingStub;
    private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub retrievalStub;
    private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalBlockingStub;
    private Path stagingDir;
    private Path originalDir;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @Before
    public void setUp() throws Exception {
        this.stagingService = new BeamFileSystemArtifactStagingService();
        this.stagingServer = GrpcFnServer.allocatePortAndCreateFor(this.stagingService, InProcessServerFactory.create());
        ManagedChannel build = InProcessChannelBuilder.forName(this.stagingServer.getApiServiceDescriptor().getUrl()).build();
        this.stagingStub = ArtifactStagingServiceGrpc.newStub(build);
        this.stagingBlockingStub = ArtifactStagingServiceGrpc.newBlockingStub(build);
        this.retrievalService = new BeamFileSystemArtifactRetrievalService();
        this.retrievalServer = GrpcFnServer.allocatePortAndCreateFor(this.retrievalService, InProcessServerFactory.create());
        ManagedChannel build2 = InProcessChannelBuilder.forName(this.retrievalServer.getApiServiceDescriptor().getUrl()).build();
        this.retrievalStub = ArtifactRetrievalServiceGrpc.newStub(build2);
        this.retrievalBlockingStub = ArtifactRetrievalServiceGrpc.newBlockingStub(build2);
        this.originalDir = this.tempFolder.newFolder("original").toPath();
        this.stagingDir = this.tempFolder.newFolder("staging").toPath();
    }

    @After
    public void tearDown() throws Exception {
        if (this.stagingServer != null) {
            this.stagingServer.close();
        }
        if (this.stagingService != null) {
            this.stagingService.close();
        }
        if (this.retrievalServer != null) {
            this.retrievalServer.close();
        }
        if (this.retrievalService != null) {
            this.retrievalService.close();
        }
    }

    private void putArtifact(String str, String str2, String str3) throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        StreamObserver putArtifact = this.stagingStub.putArtifact(new StreamObserver<ArtifactApi.PutArtifactResponse>() { // from class: org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactServicesTest.1
            public void onNext(ArtifactApi.PutArtifactResponse putArtifactResponse) {
            }

            public void onError(Throwable th) {
                th.printStackTrace();
                Assert.fail("OnError should never be called.");
            }

            public void onCompleted() {
                completableFuture.complete(Boolean.TRUE);
            }
        });
        putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setMetadata(ArtifactApi.PutArtifactMetadata.newBuilder().setMetadata(ArtifactApi.ArtifactMetadata.newBuilder().setName(str3).build()).setStagingSessionToken(str)).build());
        FileInputStream fileInputStream = new FileInputStream(str2);
        Throwable th = null;
        try {
            try {
                byte[] bArr = new byte[DATA_1KB];
                while (true) {
                    int read = fileInputStream.read(bArr);
                    if (read == -1) {
                        putArtifact.onCompleted();
                        completableFuture.get(10L, TimeUnit.SECONDS);
                        $closeResource(null, fileInputStream);
                        return;
                    }
                    putArtifact.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bArr, 0, read)).build()).build());
                }
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, fileInputStream);
            throw th2;
        }
    }

    private String commitManifest(String str, List<ArtifactApi.ArtifactMetadata> list) {
        return this.stagingBlockingStub.commitManifest(ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(str).setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(list).build()).build()).getRetrievalToken();
    }

    @Test
    public void generateStagingSessionTokenTest() throws Exception {
        String path = this.stagingDir.toAbsolutePath().toString();
        Assert.assertEquals("{\"sessionId\":\"abc123\",\"basePath\":\"" + path + "\"}", BeamFileSystemArtifactStagingService.generateStagingSessionToken("abc123", path));
    }

    void checkCleanup(String str, String str2) throws Exception {
        Assert.assertTrue(Files.exists(Paths.get(this.stagingDir.toAbsolutePath().toString(), str2), new LinkOption[0]));
        this.stagingService.removeArtifacts(str);
        Assert.assertFalse(Files.exists(Paths.get(this.stagingDir.toAbsolutePath().toString(), str2), new LinkOption[0]));
    }

    @Test
    public void noArtifactsTest() throws Exception {
        Assert.assertEquals("__no_artifacts_staged__", commitManifest(BeamFileSystemArtifactStagingService.generateStagingSessionToken("123", this.stagingDir.toUri().getPath()), Collections.emptyList()));
        Assert.assertFalse(Files.exists(Paths.get(this.stagingDir.toAbsolutePath().toString(), "123"), new LinkOption[0]));
        Assert.assertEquals("Manifest with 0 artifacts", 0L, this.retrievalBlockingStub.getManifest(ArtifactApi.GetManifestRequest.newBuilder().setRetrievalToken(r0).build()).getManifest().getArtifactCount());
    }

    @Test
    public void putArtifactsSingleSmallFileTest() throws Exception {
        String generateStagingSessionToken = BeamFileSystemArtifactStagingService.generateStagingSessionToken("123", this.stagingDir.toUri().getPath());
        Path absolutePath = Paths.get(this.originalDir.toString(), "file1").toAbsolutePath();
        Files.write(absolutePath, "some_test".getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
        putArtifact(generateStagingSessionToken, absolutePath.toString(), "file1");
        String commitManifest = commitManifest(generateStagingSessionToken, Collections.singletonList(ArtifactApi.ArtifactMetadata.newBuilder().setName("file1").build()));
        Assert.assertEquals(Paths.get(this.stagingDir.toAbsolutePath().toString(), "123", "MANIFEST"), Paths.get(commitManifest, new String[0]));
        assertFiles(Collections.singleton("file1"), commitManifest);
        checkCleanup(generateStagingSessionToken, "123");
    }

    @Test
    public void putArtifactsMultipleFilesTest() throws Exception {
        ImmutableMap build = ImmutableMap.builder().put("file5cb", 512).put("file1kb", Integer.valueOf(DATA_1KB)).put("file15cb", 1536).put("nested/file1kb", Integer.valueOf(DATA_1KB)).put("file10kb", 10240).put("file100kb", 102400).build();
        HashMap newHashMap = Maps.newHashMap();
        build.forEach((str, num) -> {
            Path absolutePath = Paths.get(this.originalDir.toString(), str).toAbsolutePath();
            try {
                Files.createDirectories(absolutePath.getParent(), new FileAttribute[0]);
                byte[] bytes = Strings.repeat("abcdefghinklmop\n", Double.valueOf(Math.ceil((num.intValue() * 1.0d) / "abcdefghinklmop\n".length())).intValue()).getBytes(StandardCharsets.UTF_8);
                Files.write(absolutePath, bytes, new OpenOption[0]);
                newHashMap.put(str, Hashing.sha256().hashBytes(bytes).toString());
            } catch (IOException e) {
            }
        });
        String generateStagingSessionToken = BeamFileSystemArtifactStagingService.generateStagingSessionToken("123", this.stagingDir.toUri().getPath());
        ArrayList arrayList = new ArrayList();
        for (String str2 : build.keySet()) {
            putArtifact(generateStagingSessionToken, Paths.get(this.originalDir.toString(), str2).toAbsolutePath().toString(), str2);
            arrayList.add(ArtifactApi.ArtifactMetadata.newBuilder().setName(str2).setSha256((String) newHashMap.get(str2)).build());
        }
        String commitManifest = commitManifest(generateStagingSessionToken, arrayList);
        Assert.assertEquals(Paths.get(this.stagingDir.toAbsolutePath().toString(), "123", "MANIFEST").toString(), commitManifest);
        assertFiles(build.keySet(), commitManifest);
        checkCleanup(generateStagingSessionToken, "123");
    }

    @Test
    public void putArtifactsMultipleFilesConcurrentlyTest() throws Exception {
        ImmutableMap build = ImmutableMap.builder().put("file5cb", 512).put("file1kb", Integer.valueOf(DATA_1KB)).put("file15cb", 1536).put("nested/file1kb", Integer.valueOf(DATA_1KB)).put("file10kb", 10240).put("file100kb", 102400).build();
        build.forEach((str, num) -> {
            Path absolutePath = Paths.get(this.originalDir.toString(), str).toAbsolutePath();
            try {
                Files.createDirectories(absolutePath.getParent(), new FileAttribute[0]);
                Files.write(absolutePath, Strings.repeat("abcdefghinklmop\n", Double.valueOf(Math.ceil((num.intValue() * 1.0d) / "abcdefghinklmop\n".length())).intValue()).getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
            } catch (IOException e) {
            }
        });
        String generateStagingSessionToken = BeamFileSystemArtifactStagingService.generateStagingSessionToken("123", this.stagingDir.toUri().getPath());
        List<ArtifactApi.ArtifactMetadata> synchronizedList = Collections.synchronizedList(new ArrayList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(8);
        try {
            for (String str2 : build.keySet()) {
                newFixedThreadPool.execute(() -> {
                    try {
                        putArtifact(generateStagingSessionToken, Paths.get(this.originalDir.toString(), str2).toAbsolutePath().toString(), str2);
                    } catch (Exception e) {
                        Assert.fail(e.getMessage());
                    }
                    synchronizedList.add(ArtifactApi.ArtifactMetadata.newBuilder().setName(str2).build());
                });
            }
            String commitManifest = commitManifest(generateStagingSessionToken, synchronizedList);
            Assert.assertEquals(Paths.get(this.stagingDir.toAbsolutePath().toString(), "123", "MANIFEST").toString(), commitManifest);
            assertFiles(build.keySet(), commitManifest);
            checkCleanup(generateStagingSessionToken, "123");
        } finally {
            newFixedThreadPool.shutdown();
            newFixedThreadPool.awaitTermination(2L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void putArtifactsMultipleFilesConcurrentSessionsTest() throws Exception {
        ImmutableMap build = ImmutableMap.builder().put("file5cb", 512).put("file1kb", Integer.valueOf(DATA_1KB)).put("file15cb", 1536).build();
        ImmutableMap build2 = ImmutableMap.builder().put("nested/file1kb", Integer.valueOf(DATA_1KB)).put("file10kb", 10240).put("file100kb", 102400).build();
        ImmutableMap.builder().putAll(build).putAll(build2).build().forEach((str, num) -> {
            Path absolutePath = Paths.get(this.originalDir.toString(), str).toAbsolutePath();
            try {
                Files.createDirectories(absolutePath.getParent(), new FileAttribute[0]);
                Files.write(absolutePath, Strings.repeat("abcdefghinklmop\n", Double.valueOf(Math.ceil((num.intValue() * 1.0d) / "abcdefghinklmop\n".length())).intValue()).getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
            } catch (IOException e) {
            }
        });
        String generateStagingSessionToken = BeamFileSystemArtifactStagingService.generateStagingSessionToken("123", this.stagingDir.toUri().getPath());
        String generateStagingSessionToken2 = BeamFileSystemArtifactStagingService.generateStagingSessionToken("abc", this.stagingDir.toUri().getPath());
        List<ArtifactApi.ArtifactMetadata> synchronizedList = Collections.synchronizedList(new ArrayList());
        List<ArtifactApi.ArtifactMetadata> synchronizedList2 = Collections.synchronizedList(new ArrayList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(8);
        try {
            Iterator it = build.keySet().iterator();
            Iterator it2 = build2.keySet().iterator();
            while (it.hasNext() && it2.hasNext()) {
                String str2 = (String) it.next();
                String str3 = (String) it2.next();
                newFixedThreadPool.execute(() -> {
                    try {
                        putArtifact(generateStagingSessionToken, Paths.get(this.originalDir.toString(), str2).toAbsolutePath().toString(), str2);
                        putArtifact(generateStagingSessionToken2, Paths.get(this.originalDir.toString(), str3).toAbsolutePath().toString(), str3);
                    } catch (Exception e) {
                        Assert.fail(e.getMessage());
                    }
                    synchronizedList.add(ArtifactApi.ArtifactMetadata.newBuilder().setName(str2).build());
                    synchronizedList2.add(ArtifactApi.ArtifactMetadata.newBuilder().setName(str3).build());
                });
            }
            String commitManifest = commitManifest(generateStagingSessionToken, synchronizedList);
            String commitManifest2 = commitManifest(generateStagingSessionToken2, synchronizedList2);
            Assert.assertEquals(Paths.get(this.stagingDir.toAbsolutePath().toString(), "123", "MANIFEST").toString(), commitManifest);
            Assert.assertEquals(Paths.get(this.stagingDir.toAbsolutePath().toString(), "abc", "MANIFEST").toString(), commitManifest2);
            assertFiles(build.keySet(), commitManifest);
            assertFiles(build2.keySet(), commitManifest2);
            checkCleanup(generateStagingSessionToken, "123");
            checkCleanup(generateStagingSessionToken2, "abc");
        } finally {
            newFixedThreadPool.shutdown();
            newFixedThreadPool.awaitTermination(2L, TimeUnit.SECONDS);
        }
    }

    private void assertFiles(Set<String> set, String str) throws Exception {
        ArtifactApi.ProxyManifest loadManifest = BeamFileSystemArtifactRetrievalService.loadManifest(str);
        Assert.assertEquals("Manifest in proxy manifest file doesn't match the retrieved manifest", loadManifest.getManifest(), this.retrievalBlockingStub.getManifest(ArtifactApi.GetManifestRequest.newBuilder().setRetrievalToken(str).build()).getManifest());
        Assert.assertEquals("Files in locations does not match actual file list.", set, loadManifest.getLocationList().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet()));
        Assert.assertEquals("Duplicate file entries in locations.", set.size(), loadManifest.getLocationCount());
        for (ArtifactApi.ProxyManifest.Location location : loadManifest.getLocationList()) {
            String readFile = readFile(Paths.get(this.originalDir.toString(), location.getName()).toAbsolutePath().toString());
            Assert.assertEquals("Staged content doesn't match expected content for " + location.getName(), readFile, readFile(location.getUri()));
            Assert.assertEquals("Retrieved content doesn't match expected content for " + location.getName(), readFile, retrieveArtifact(location.getName(), str));
        }
    }

    private static String readFile(String str) throws IOException {
        InputStream newInputStream = Channels.newInputStream(FileSystems.open(FileSystems.matchNewResource(str, false)));
        try {
            String str2 = new String(ByteStreams.toByteArray(newInputStream), StandardCharsets.UTF_8);
            if (newInputStream != null) {
                $closeResource(null, newInputStream);
            }
            return str2;
        } catch (Throwable th) {
            if (newInputStream != null) {
                $closeResource(null, newInputStream);
            }
            throw th;
        }
    }

    private String retrieveArtifact(String str, String str2) throws ExecutionException, InterruptedException {
        final CompletableFuture completableFuture = new CompletableFuture();
        this.retrievalStub.getArtifact(ArtifactApi.GetArtifactRequest.newBuilder().setRetrievalToken(str2).setName(str).build(), new StreamObserver<ArtifactApi.ArtifactChunk>() { // from class: org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactServicesTest.2
            private ByteString data = ByteString.EMPTY;

            public void onNext(ArtifactApi.ArtifactChunk artifactChunk) {
                this.data = this.data.concat(artifactChunk.getData());
            }

            public void onError(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            public void onCompleted() {
                completableFuture.complete(this.data);
            }
        });
        return ((ByteString) completableFuture.get()).toStringUtf8();
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
