package org.apache.beam.runners.direct.portable.artifact;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerService.class */
public class LocalFileSystemArtifactStagerService extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) LocalFileSystemArtifactStagerService.class);
    private final LocalArtifactStagingLocation location;

    /* loaded from: input_file:org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerService$CreateAndWriteFileObserver.class */
    private class CreateAndWriteFileObserver implements StreamObserver<ArtifactApi.PutArtifactRequest> {
        private final StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver;
        private FileWritingObserver writer;

        private CreateAndWriteFileObserver(StreamObserver<ArtifactApi.PutArtifactResponse> streamObserver) {
            this.responseObserver = streamObserver;
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onNext(ArtifactApi.PutArtifactRequest putArtifactRequest) {
            try {
                if (this.writer != null) {
                    this.writer.onNext(putArtifactRequest);
                } else {
                    if (!putArtifactRequest.getContentCase().equals(ArtifactApi.PutArtifactRequest.ContentCase.METADATA)) {
                        throw Status.INVALID_ARGUMENT.withDescription(String.format("Expected the first %s to contain the Artifact Name, got %s", ArtifactApi.PutArtifactRequest.class.getSimpleName(), putArtifactRequest.getContentCase())).asRuntimeException();
                    }
                    this.writer = createFile(putArtifactRequest.getMetadata().getMetadata());
                }
            } catch (StatusRuntimeException e) {
                this.responseObserver.onError(e);
            } catch (Exception e2) {
                this.responseObserver.onError(Status.INTERNAL.withCause(e2).withDescription(Throwables.getStackTraceAsString(e2)).asRuntimeException());
            }
        }

        private FileWritingObserver createFile(ArtifactApi.ArtifactMetadata artifactMetadata) throws IOException {
            File artifactFile = LocalFileSystemArtifactStagerService.this.location.getArtifactFile(artifactMetadata.getName());
            if (artifactFile.createNewFile()) {
                return new FileWritingObserver(artifactFile, new FileOutputStream(artifactFile), this.responseObserver);
            }
            throw Status.ALREADY_EXISTS.withDescription(String.format("Artifact with name %s already exists", artifactMetadata)).asRuntimeException();
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            if (this.writer != null) {
                this.writer.onError(th);
            } else {
                this.responseObserver.onCompleted();
            }
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onCompleted() {
            if (this.writer != null) {
                this.writer.onCompleted();
            } else {
                this.responseObserver.onCompleted();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactStagerService$FileWritingObserver.class */
    public static class FileWritingObserver implements StreamObserver<ArtifactApi.PutArtifactRequest> {
        private final File destination;
        private final OutputStream target;
        private final StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver;

        private FileWritingObserver(File file, OutputStream outputStream, StreamObserver<ArtifactApi.PutArtifactResponse> streamObserver) {
            this.destination = file;
            this.target = outputStream;
            this.responseObserver = streamObserver;
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onNext(ArtifactApi.PutArtifactRequest putArtifactRequest) {
            try {
                if (putArtifactRequest.getData() == null) {
                    throw Status.INVALID_ARGUMENT.withDescription(String.format("Expected all chunks in the current stream state to contain data, got %s", putArtifactRequest.getContentCase())).asRuntimeException();
                }
                putArtifactRequest.getData().getData().writeTo(this.target);
            } catch (Exception e) {
                cleanedUp(e);
            }
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            if (cleanedUp(null)) {
                this.responseObserver.onCompleted();
            }
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onCompleted() {
            try {
                this.target.close();
                this.responseObserver.onNext(ArtifactApi.PutArtifactResponse.getDefaultInstance());
                this.responseObserver.onCompleted();
            } catch (IOException e) {
                LocalFileSystemArtifactStagerService.LOG.error("Failed to complete writing file {}", this.destination, e);
                cleanedUp(e);
            }
        }

        private boolean cleanedUp(@Nullable Throwable th) {
            Throwable th2 = th;
            try {
                this.target.close();
                if (!this.destination.delete()) {
                    LocalFileSystemArtifactStagerService.LOG.debug("Couldn't delete failed write at {}", this.destination);
                }
            } catch (IOException e) {
                if (th == null) {
                    th2 = e;
                } else {
                    th2.addSuppressed(e);
                }
                LocalFileSystemArtifactStagerService.LOG.error("Failed to clean up after writing file {}", this.destination, e);
            }
            if (th2 != null) {
                if ((th2 instanceof StatusException) || (th2 instanceof StatusRuntimeException)) {
                    this.responseObserver.onError(th2);
                } else {
                    this.responseObserver.onError(Status.INTERNAL.withCause(th2).withDescription(Throwables.getStackTraceAsString(th2)).asException());
                }
            }
            return th2 == null;
        }
    }

    public static LocalFileSystemArtifactStagerService forRootDirectory(File file) {
        return new LocalFileSystemArtifactStagerService(file);
    }

    private LocalFileSystemArtifactStagerService(File file) {
        this.location = LocalArtifactStagingLocation.createAt(file);
    }

    @Override // org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase
    public StreamObserver<ArtifactApi.PutArtifactRequest> putArtifact(StreamObserver<ArtifactApi.PutArtifactResponse> streamObserver) {
        return new CreateAndWriteFileObserver(streamObserver);
    }

    @Override // org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase
    public void commitManifest(ArtifactApi.CommitManifestRequest commitManifestRequest, StreamObserver<ArtifactApi.CommitManifestResponse> streamObserver) {
        try {
            commitManifestOrThrow(commitManifestRequest, streamObserver);
        } catch (StatusRuntimeException e) {
            streamObserver.onError(e);
            LOG.error("Failed to commit Manifest {}", commitManifestRequest.getManifest(), e);
        } catch (Exception e2) {
            streamObserver.onError(Status.INTERNAL.withCause(e2).withDescription(Throwables.getStackTraceAsString(e2)).asRuntimeException());
            LOG.error("Failed to commit Manifest {}", commitManifestRequest.getManifest(), e2);
        }
    }

    private void commitManifestOrThrow(ArtifactApi.CommitManifestRequest commitManifestRequest, StreamObserver<ArtifactApi.CommitManifestResponse> streamObserver) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (ArtifactApi.ArtifactMetadata artifactMetadata : commitManifestRequest.getManifest().getArtifactList()) {
            if (!this.location.getArtifactFile(artifactMetadata.getName()).exists()) {
                arrayList.add(artifactMetadata);
            }
        }
        if (!arrayList.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription(String.format("Attempted to commit manifest with missing Artifacts: [%s]", arrayList)).asRuntimeException();
        }
        File manifestFile = this.location.getManifestFile();
        Preconditions.checkState(manifestFile.createNewFile(), "Could not create file to store manifest");
        FileOutputStream fileOutputStream = new FileOutputStream(manifestFile);
        Throwable th = null;
        try {
            try {
                commitManifestRequest.getManifest().writeTo(fileOutputStream);
                if (0 != 0) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fileOutputStream.close();
                }
                streamObserver.onNext(ArtifactApi.CommitManifestResponse.newBuilder().setRetrievalToken(this.location.getRootPath()).build());
                streamObserver.onCompleted();
            } finally {
            }
        } catch (Throwable th3) {
            if (th != null) {
                try {
                    fileOutputStream.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                fileOutputStream.close();
            }
            throw th3;
        }
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService, java.lang.AutoCloseable
    public void close() throws Exception {
    }

    @VisibleForTesting
    LocalArtifactStagingLocation getLocation() {
        return this.location;
    }
}
