package org.apache.beam.runners.fnexecution.artifact;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.util.JsonFormat;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hasher;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.class */
public abstract class AbstractArtifactRetrievalService extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase implements ArtifactRetrievalService {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractArtifactRetrievalService.class);
    private static final int ARTIFACT_CHUNK_SIZE_BYTES = 2097152;
    private final Cache<String, ArtifactApi.ProxyManifest> manifestCache;

    public AbstractArtifactRetrievalService() {
        this(CacheBuilder.newBuilder().expireAfterAccess(1L, TimeUnit.HOURS).maximumSize(100L).build());
    }

    public AbstractArtifactRetrievalService(Cache<String, ArtifactApi.ProxyManifest> cache) {
        this.manifestCache = cache;
    }

    public abstract InputStream openManifest(String str) throws IOException;

    public abstract InputStream openUri(String str, String str2) throws IOException;

    public ArtifactApi.ProxyManifest getManifestProxy(String str) throws IOException, ExecutionException {
        return this.manifestCache.get(str, () -> {
            InputStream openManifest = openManifest(str);
            Throwable th = null;
            try {
                try {
                    ArtifactApi.ProxyManifest loadManifest = loadManifest(openManifest, str);
                    if (openManifest != null) {
                        $closeResource(null, openManifest);
                    }
                    return loadManifest;
                } finally {
                }
            } catch (Throwable th2) {
                if (openManifest != null) {
                    $closeResource(th, openManifest);
                }
                throw th2;
            }
        });
    }

    @Override // org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase
    public void getManifest(ArtifactApi.GetManifestRequest getManifestRequest, StreamObserver<ArtifactApi.GetManifestResponse> streamObserver) {
        ArtifactApi.Manifest manifest;
        String retrievalToken = getManifestRequest.getRetrievalToken();
        if (Strings.isNullOrEmpty(retrievalToken)) {
            throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Empty artifact token"));
        }
        LOG.info("GetManifest for {}", retrievalToken);
        try {
            if (AbstractArtifactStagingService.NO_ARTIFACTS_STAGED_TOKEN.equals(retrievalToken)) {
                manifest = ArtifactApi.Manifest.newBuilder().build();
            } else {
                ArtifactApi.ProxyManifest manifestProxy = getManifestProxy(retrievalToken);
                LOG.info("GetManifest for {} -> {} artifacts", retrievalToken, Integer.valueOf(manifestProxy.getManifest().getArtifactCount()));
                manifest = manifestProxy.getManifest();
            }
            streamObserver.onNext(ArtifactApi.GetManifestResponse.newBuilder().setManifest(manifest).build());
            streamObserver.onCompleted();
        } catch (Exception e) {
            LOG.warn("GetManifest for {} failed.", retrievalToken, e);
            streamObserver.onError(e);
        }
    }

    @Override // org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase
    public void getArtifact(ArtifactApi.GetArtifactRequest getArtifactRequest, StreamObserver<ArtifactApi.ArtifactChunk> streamObserver) {
        LOG.debug("GetArtifact {}", getArtifactRequest);
        String name = getArtifactRequest.getName();
        try {
            ArtifactApi.ProxyManifest manifestProxy = getManifestProxy(getArtifactRequest.getRetrievalToken());
            ArtifactApi.ProxyManifest.Location orElseThrow = manifestProxy.getLocationList().stream().filter(location -> {
                return location.getName().equals(name);
            }).findFirst().orElseThrow(() -> {
                return new StatusRuntimeException(Status.NOT_FOUND.withDescription(String.format("Artifact location not found in manifest: %s", name)));
            });
            ArtifactApi.ArtifactMetadata orElseThrow2 = manifestProxy.getManifest().getArtifactList().stream().filter(artifactMetadata -> {
                return artifactMetadata.getName().equals(name);
            }).findFirst().orElseThrow(() -> {
                return new StatusRuntimeException(Status.NOT_FOUND.withDescription(String.format("Artifact metadata not found in manifest: %s", name)));
            });
            Hasher newHasher = Hashing.sha256().newHasher();
            byte[] bArr = new byte[ARTIFACT_CHUNK_SIZE_BYTES];
            InputStream openUri = openUri(getArtifactRequest.getRetrievalToken(), orElseThrow.getUri());
            Throwable th = null;
            while (true) {
                try {
                    try {
                        int read = openUri.read(bArr);
                        if (read == -1) {
                            break;
                        }
                        newHasher.putBytes(bArr, 0, read);
                        streamObserver.onNext(ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bArr, 0, read)).build());
                    } catch (Throwable th2) {
                        if (openUri != null) {
                            $closeResource(th, openUri);
                        }
                        throw th2;
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            }
            if (openUri != null) {
                $closeResource(null, openUri);
            }
            if (orElseThrow2.getSha256() != null && !orElseThrow2.getSha256().isEmpty()) {
                String sha256 = orElseThrow2.getSha256();
                String hashCode = newHasher.hash().toString();
                if (!hashCode.equals(sha256)) {
                    throw new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Artifact %s is corrupt: expected sha256 %s, actual %s", name, sha256, hashCode)));
                }
            }
            streamObserver.onCompleted();
        } catch (IOException | ExecutionException e) {
            LOG.info("GetArtifact {} failed", getArtifactRequest, e);
            streamObserver.onError(e);
        }
    }

    @Override // org.apache.beam.runners.fnexecution.FnService, java.lang.AutoCloseable
    public void close() throws Exception {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ArtifactApi.ProxyManifest loadManifest(InputStream inputStream, String str) throws IOException {
        ArtifactApi.ProxyManifest.Builder newBuilder = ArtifactApi.ProxyManifest.newBuilder();
        JsonFormat.parser().merge(new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8), newBuilder);
        ArtifactApi.ProxyManifest build = newBuilder.build();
        Preconditions.checkArgument(build.hasManifest(), String.format("Invalid ProxyManifest at %s: doesn't have a Manifest", str));
        Preconditions.checkArgument(build.getLocationCount() == build.getManifest().getArtifactCount(), String.format("Invalid ProxyManifestat %s: %d locations but %d artifacts", str, Integer.valueOf(build.getLocationCount()), Integer.valueOf(build.getManifest().getArtifactCount())));
        LOG.info("Manifest at {} has {} artifact locations", str, Integer.valueOf(build.getManifest().getArtifactCount()));
        return build;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
