/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.artifact;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class BeamFileSystemArtifactServicesTest {
    private static final int DATA_1KB = 1024;
    private GrpcFnServer<BeamFileSystemArtifactStagingService> stagingServer;
    private BeamFileSystemArtifactStagingService stagingService;
    private GrpcFnServer<BeamFileSystemArtifactRetrievalService> retrievalServer;
    private BeamFileSystemArtifactRetrievalService retrievalService;
    private ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingStub;
    private ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub stagingBlockingStub;
    private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub retrievalStub;
    private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalBlockingStub;
    private Path stagingDir;
    private Path originalDir;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @Before
    public void setUp() throws Exception {
        this.stagingService = new BeamFileSystemArtifactStagingService();
        this.stagingServer = GrpcFnServer.allocatePortAndCreateFor((FnService)this.stagingService, (ServerFactory)InProcessServerFactory.create());
        ManagedChannel stagingChannel = InProcessChannelBuilder.forName((String)this.stagingServer.getApiServiceDescriptor().getUrl()).build();
        this.stagingStub = ArtifactStagingServiceGrpc.newStub((Channel)stagingChannel);
        this.stagingBlockingStub = ArtifactStagingServiceGrpc.newBlockingStub((Channel)stagingChannel);
        this.retrievalService = new BeamFileSystemArtifactRetrievalService();
        this.retrievalServer = GrpcFnServer.allocatePortAndCreateFor((FnService)this.retrievalService, (ServerFactory)InProcessServerFactory.create());
        ManagedChannel retrievalChannel = InProcessChannelBuilder.forName((String)this.retrievalServer.getApiServiceDescriptor().getUrl()).build();
        this.retrievalStub = ArtifactRetrievalServiceGrpc.newStub((Channel)retrievalChannel);
        this.retrievalBlockingStub = ArtifactRetrievalServiceGrpc.newBlockingStub((Channel)retrievalChannel);
        this.originalDir = this.tempFolder.newFolder("original").toPath();
        this.stagingDir = this.tempFolder.newFolder("staging").toPath();
    }

    @After
    public void tearDown() throws Exception {
        if (this.stagingServer != null) {
            this.stagingServer.close();
        }
        if (this.stagingService != null) {
            this.stagingService.close();
        }
        if (this.retrievalServer != null) {
            this.retrievalServer.close();
        }
        if (this.retrievalService != null) {
            this.retrievalService.close();
        }
    }

    private void putArtifact(String stagingSessionToken, String filePath, String fileName) throws Exception {
        final CompletableFuture complete = new CompletableFuture();
        StreamObserver outputStreamObserver = this.stagingStub.putArtifact((StreamObserver)new StreamObserver<ArtifactApi.PutArtifactResponse>(){

            public void onNext(ArtifactApi.PutArtifactResponse putArtifactResponse) {
            }

            public void onError(Throwable throwable) {
                throwable.printStackTrace();
                Assert.fail((String)"OnError should never be called.");
            }

            public void onCompleted() {
                complete.complete(Boolean.TRUE);
            }
        });
        outputStreamObserver.onNext((Object)ArtifactApi.PutArtifactRequest.newBuilder().setMetadata(ArtifactApi.PutArtifactMetadata.newBuilder().setMetadata(ArtifactApi.ArtifactMetadata.newBuilder().setName(fileName).build()).setStagingSessionToken(stagingSessionToken)).build());
        try (FileInputStream fileInputStream = new FileInputStream(filePath);){
            int len;
            byte[] buffer = new byte[1024];
            while ((len = fileInputStream.read(buffer)) != -1) {
                outputStreamObserver.onNext((Object)ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom((byte[])buffer, (int)0, (int)len)).build()).build());
            }
            outputStreamObserver.onCompleted();
            complete.get(10L, TimeUnit.SECONDS);
        }
    }

    private String commitManifest(String stagingSessionToken, List<ArtifactApi.ArtifactMetadata> artifacts) {
        return this.stagingBlockingStub.commitManifest(ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken).setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts).build()).build()).getRetrievalToken();
    }

    @Test
    public void generateStagingSessionTokenTest() throws Exception {
        String basePath = this.stagingDir.toAbsolutePath().toString();
        String stagingToken = BeamFileSystemArtifactStagingService.generateStagingSessionToken((String)"abc123", (String)basePath);
        Assert.assertEquals((Object)("{\"sessionId\":\"abc123\",\"basePath\":\"" + basePath + "\"}"), (Object)stagingToken);
    }

    void checkCleanup(String stagingSessionToken, String stagingSession) throws Exception {
        Assert.assertTrue((boolean)Files.exists(Paths.get(this.stagingDir.toAbsolutePath().toString(), stagingSession), new LinkOption[0]));
        this.stagingService.removeArtifacts(stagingSessionToken);
        Assert.assertFalse((boolean)Files.exists(Paths.get(this.stagingDir.toAbsolutePath().toString(), stagingSession), new LinkOption[0]));
    }

    @Test
    public void putArtifactsSingleSmallFileTest() throws Exception {
        String fileName = "file1";
        String stagingSession = "123";
        String stagingSessionToken = BeamFileSystemArtifactStagingService.generateStagingSessionToken((String)stagingSession, (String)this.stagingDir.toUri().getPath());
        Path srcFilePath = Paths.get(this.originalDir.toString(), fileName).toAbsolutePath();
        Files.write(srcFilePath, "some_test".getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
        this.putArtifact(stagingSessionToken, srcFilePath.toString(), fileName);
        String stagingToken = this.commitManifest(stagingSessionToken, Collections.singletonList(ArtifactApi.ArtifactMetadata.newBuilder().setName(fileName).build()));
        Assert.assertEquals((Object)Paths.get(this.stagingDir.toAbsolutePath().toString(), stagingSession, "MANIFEST"), (Object)Paths.get(stagingToken, new String[0]));
        this.assertFiles(Collections.singleton(fileName), stagingToken);
        this.checkCleanup(stagingSessionToken, stagingSession);
    }

    @Test
    public void putArtifactsMultipleFilesTest() throws Exception {
        String stagingSession = "123";
        ImmutableMap files = ImmutableMap.builder().put((Object)"file5cb", (Object)512).put((Object)"file1kb", (Object)1024).put((Object)"file15cb", (Object)1536).put((Object)"nested/file1kb", (Object)1024).put((Object)"file10kb", (Object)10240).put((Object)"file100kb", (Object)102400).build();
        HashMap hashes = Maps.newHashMap();
        String text = "abcdefghinklmop\n";
        files.forEach((fileName, size) -> {
            Path filePath = Paths.get(this.originalDir.toString(), fileName).toAbsolutePath();
            try {
                Files.createDirectories(filePath.getParent(), new FileAttribute[0]);
                byte[] contents = Strings.repeat((String)"abcdefghinklmop\n", (int)Double.valueOf(Math.ceil((double)size.intValue() * 1.0 / (double)"abcdefghinklmop\n".length())).intValue()).getBytes(StandardCharsets.UTF_8);
                Files.write(filePath, contents, new OpenOption[0]);
                hashes.put(fileName, Hashing.sha256().hashBytes(contents).toString());
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
        String stagingSessionToken = BeamFileSystemArtifactStagingService.generateStagingSessionToken((String)stagingSession, (String)this.stagingDir.toUri().getPath());
        ArrayList<ArtifactApi.ArtifactMetadata> metadata = new ArrayList<ArtifactApi.ArtifactMetadata>();
        for (String fileName2 : files.keySet()) {
            this.putArtifact(stagingSessionToken, Paths.get(this.originalDir.toString(), fileName2).toAbsolutePath().toString(), fileName2);
            metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName(fileName2).setSha256((String)hashes.get(fileName2)).build());
        }
        String retrievalToken = this.commitManifest(stagingSessionToken, metadata);
        Assert.assertEquals((Object)Paths.get(this.stagingDir.toAbsolutePath().toString(), stagingSession, "MANIFEST").toString(), (Object)retrievalToken);
        this.assertFiles(files.keySet(), retrievalToken);
        this.checkCleanup(stagingSessionToken, stagingSession);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void putArtifactsMultipleFilesConcurrentlyTest() throws Exception {
        String stagingSession = "123";
        ImmutableMap files = ImmutableMap.builder().put((Object)"file5cb", (Object)512).put((Object)"file1kb", (Object)1024).put((Object)"file15cb", (Object)1536).put((Object)"nested/file1kb", (Object)1024).put((Object)"file10kb", (Object)10240).put((Object)"file100kb", (Object)102400).build();
        String text = "abcdefghinklmop\n";
        files.forEach((fileName, size) -> {
            Path filePath = Paths.get(this.originalDir.toString(), fileName).toAbsolutePath();
            try {
                Files.createDirectories(filePath.getParent(), new FileAttribute[0]);
                Files.write(filePath, Strings.repeat((String)"abcdefghinklmop\n", (int)Double.valueOf(Math.ceil((double)size.intValue() * 1.0 / (double)"abcdefghinklmop\n".length())).intValue()).getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
        String stagingSessionToken = BeamFileSystemArtifactStagingService.generateStagingSessionToken((String)stagingSession, (String)this.stagingDir.toUri().getPath());
        List<ArtifactApi.ArtifactMetadata> metadata = Collections.synchronizedList(new ArrayList());
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        try {
            for (String fileName2 : files.keySet()) {
                executorService.execute(() -> {
                    try {
                        this.putArtifact(stagingSessionToken, Paths.get(this.originalDir.toString(), fileName2).toAbsolutePath().toString(), fileName2);
                    }
                    catch (Exception e) {
                        Assert.fail((String)e.getMessage());
                    }
                    metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName(fileName2).build());
                });
            }
        }
        finally {
            executorService.shutdown();
            executorService.awaitTermination(2L, TimeUnit.SECONDS);
        }
        String retrievalToken = this.commitManifest(stagingSessionToken, metadata);
        Assert.assertEquals((Object)Paths.get(this.stagingDir.toAbsolutePath().toString(), stagingSession, "MANIFEST").toString(), (Object)retrievalToken);
        this.assertFiles(files.keySet(), retrievalToken);
        this.checkCleanup(stagingSessionToken, stagingSession);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void putArtifactsMultipleFilesConcurrentSessionsTest() throws Exception {
        String stagingSession1 = "123";
        String stagingSession2 = "abc";
        ImmutableMap files1 = ImmutableMap.builder().put((Object)"file5cb", (Object)512).put((Object)"file1kb", (Object)1024).put((Object)"file15cb", (Object)1536).build();
        ImmutableMap files2 = ImmutableMap.builder().put((Object)"nested/file1kb", (Object)1024).put((Object)"file10kb", (Object)10240).put((Object)"file100kb", (Object)102400).build();
        String text = "abcdefghinklmop\n";
        ImmutableMap.builder().putAll((Map)files1).putAll((Map)files2).build().forEach((fileName, size) -> {
            Path filePath = Paths.get(this.originalDir.toString(), fileName).toAbsolutePath();
            try {
                Files.createDirectories(filePath.getParent(), new FileAttribute[0]);
                Files.write(filePath, Strings.repeat((String)"abcdefghinklmop\n", (int)Double.valueOf(Math.ceil((double)size.intValue() * 1.0 / (double)"abcdefghinklmop\n".length())).intValue()).getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
        String stagingSessionToken1 = BeamFileSystemArtifactStagingService.generateStagingSessionToken((String)stagingSession1, (String)this.stagingDir.toUri().getPath());
        String stagingSessionToken2 = BeamFileSystemArtifactStagingService.generateStagingSessionToken((String)stagingSession2, (String)this.stagingDir.toUri().getPath());
        List<ArtifactApi.ArtifactMetadata> metadata1 = Collections.synchronizedList(new ArrayList());
        List<ArtifactApi.ArtifactMetadata> metadata2 = Collections.synchronizedList(new ArrayList());
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        try {
            Iterator iterator1 = files1.keySet().iterator();
            Iterator iterator2 = files2.keySet().iterator();
            while (iterator1.hasNext() && iterator2.hasNext()) {
                String fileName1 = (String)iterator1.next();
                String fileName2 = (String)iterator2.next();
                executorService.execute(() -> {
                    try {
                        this.putArtifact(stagingSessionToken1, Paths.get(this.originalDir.toString(), fileName1).toAbsolutePath().toString(), fileName1);
                        this.putArtifact(stagingSessionToken2, Paths.get(this.originalDir.toString(), fileName2).toAbsolutePath().toString(), fileName2);
                    }
                    catch (Exception e) {
                        Assert.fail((String)e.getMessage());
                    }
                    metadata1.add(ArtifactApi.ArtifactMetadata.newBuilder().setName(fileName1).build());
                    metadata2.add(ArtifactApi.ArtifactMetadata.newBuilder().setName(fileName2).build());
                });
            }
        }
        finally {
            executorService.shutdown();
            executorService.awaitTermination(2L, TimeUnit.SECONDS);
        }
        String retrievalToken1 = this.commitManifest(stagingSessionToken1, metadata1);
        String retrievalToken2 = this.commitManifest(stagingSessionToken2, metadata2);
        Assert.assertEquals((Object)Paths.get(this.stagingDir.toAbsolutePath().toString(), stagingSession1, "MANIFEST").toString(), (Object)retrievalToken1);
        Assert.assertEquals((Object)Paths.get(this.stagingDir.toAbsolutePath().toString(), stagingSession2, "MANIFEST").toString(), (Object)retrievalToken2);
        this.assertFiles(files1.keySet(), retrievalToken1);
        this.assertFiles(files2.keySet(), retrievalToken2);
        this.checkCleanup(stagingSessionToken1, stagingSession1);
        this.checkCleanup(stagingSessionToken2, stagingSession2);
    }

    private void assertFiles(Set<String> files, String retrievalToken) throws Exception {
        ArtifactApi.ProxyManifest proxyManifest = BeamFileSystemArtifactRetrievalService.loadManifest((String)retrievalToken);
        ArtifactApi.GetManifestResponse retrievedManifest = this.retrievalBlockingStub.getManifest(ArtifactApi.GetManifestRequest.newBuilder().setRetrievalToken(retrievalToken).build());
        Assert.assertEquals((String)"Manifest in proxy manifest file doesn't match the retrieved manifest", (Object)proxyManifest.getManifest(), (Object)retrievedManifest.getManifest());
        Assert.assertEquals((String)"Files in locations does not match actual file list.", files, proxyManifest.getLocationList().stream().map(ArtifactApi.ProxyManifest.Location::getName).collect(Collectors.toSet()));
        Assert.assertEquals((String)"Duplicate file entries in locations.", (long)files.size(), (long)proxyManifest.getLocationCount());
        for (ArtifactApi.ProxyManifest.Location location : proxyManifest.getLocationList()) {
            String expectedContent = BeamFileSystemArtifactServicesTest.readFile(Paths.get(this.originalDir.toString(), location.getName()).toAbsolutePath().toString());
            String actualStagedContent = BeamFileSystemArtifactServicesTest.readFile(location.getUri());
            Assert.assertEquals((String)("Staged content doesn't match expected content for " + location.getName()), (Object)expectedContent, (Object)actualStagedContent);
            String retrievedContent = this.retrieveArtifact(location.getName(), retrievalToken);
            Assert.assertEquals((String)("Retrieved content doesn't match expected content for " + location.getName()), (Object)expectedContent, (Object)retrievedContent);
        }
    }

    private static String readFile(String path) throws IOException {
        try (InputStream stream = Channels.newInputStream(FileSystems.open((ResourceId)FileSystems.matchNewResource((String)path, (boolean)false)));){
            String string = new String(ByteStreams.toByteArray((InputStream)stream), StandardCharsets.UTF_8);
            return string;
        }
    }

    private String retrieveArtifact(String name, String retrievalToken) throws ExecutionException, InterruptedException {
        final CompletableFuture result = new CompletableFuture();
        this.retrievalStub.getArtifact(ArtifactApi.GetArtifactRequest.newBuilder().setRetrievalToken(retrievalToken).setName(name).build(), (StreamObserver)new StreamObserver<ArtifactApi.ArtifactChunk>(){
            private ByteString data = ByteString.EMPTY;

            public void onNext(ArtifactApi.ArtifactChunk artifactChunk) {
                this.data = this.data.concat(artifactChunk.getData());
            }

            public void onError(Throwable throwable) {
                result.completeExceptionally(throwable);
            }

            public void onCompleted() {
                result.complete(this.data);
            }
        });
        return ((ByteString)result.get()).toStringUtf8();
    }
}

