package org.apache.hugegraph.computer.core.snapshot;

import io.minio.BucketExistsArgs;
import io.minio.DownloadObjectArgs;
import io.minio.ListObjectsArgs;
import io.minio.MakeBucketArgs;
import io.minio.MinioClient;
import io.minio.RemoveObjectsArgs;
import io.minio.Result;
import io.minio.UploadObjectArgs;
import io.minio.messages.DeleteError;
import io.minio.messages.DeleteObject;
import io.minio.messages.Item;
import java.io.File;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import org.apache.commons.lang.StringUtils;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.ContainerInfo;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.partition.Partitioner;
import org.apache.hugegraph.computer.core.manager.Manager;
import org.apache.hugegraph.computer.core.network.buffer.FileRegionBuffer;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvManager;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/snapshot/SnapshotManager.class */
public class SnapshotManager implements Manager {
    private static final Logger LOG = Log.logger(SnapshotManager.class);
    public static final String NAME = "worker_snapshot";
    private final MessageSendManager sendManager;
    private final MessageRecvManager recvManager;
    private final ContainerInfo workerInfo;
    private final Partitioner partitioner;
    private final int partitionCount;
    private final boolean loadSnapshot;
    private final boolean writeSnapshot;
    private final String snapshotName;
    private MinioClient minioClient;
    private String bucketName;

    public SnapshotManager(ComputerContext computerContext, MessageSendManager messageSendManager, MessageRecvManager messageRecvManager, ContainerInfo containerInfo) {
        this.loadSnapshot = ((Boolean) computerContext.config().get(ComputerOptions.SNAPSHOT_LOAD)).booleanValue();
        this.writeSnapshot = ((Boolean) computerContext.config().get(ComputerOptions.SNAPSHOT_WRITE)).booleanValue();
        this.sendManager = messageSendManager;
        this.recvManager = messageRecvManager;
        this.recvManager.setSnapshotManager(this);
        this.workerInfo = containerInfo;
        this.partitioner = (Partitioner) computerContext.config().createObject(ComputerOptions.WORKER_PARTITIONER);
        this.partitionCount = ((Integer) computerContext.config().get(ComputerOptions.JOB_PARTITIONS_COUNT)).intValue();
        this.snapshotName = (String) computerContext.config().get(ComputerOptions.SNAPSHOT_NAME);
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public String name() {
        return NAME;
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public void init(Config config) {
        String str = (String) config.get(ComputerOptions.SNAPSHOT_MINIO_ENDPOINT);
        this.bucketName = (String) config.get(ComputerOptions.SNAPSHOT_MINIO_BUCKET_NAME);
        if (StringUtils.isNotEmpty(str) && StringUtils.isNotEmpty(this.bucketName)) {
            this.minioClient = MinioClient.builder().endpoint(str).credentials((String) config.get(ComputerOptions.SNAPSHOT_MINIO_ACCESS_KEY), (String) config.get(ComputerOptions.SNAPSHOT_MINIO_SECRET_KEY)).build();
            try {
                if (!this.minioClient.bucketExists(BucketExistsArgs.builder().bucket(this.bucketName).build())) {
                    this.minioClient.makeBucket(MakeBucketArgs.builder().bucket(this.bucketName).build());
                }
            } catch (Exception e) {
                throw new ComputerException("Failed to initialize bucket %s", new Object[]{this.bucketName, e});
            }
        }
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public void close(Config config) {
    }

    public boolean loadSnapshot() {
        return this.loadSnapshot;
    }

    public boolean writeSnapshot() {
        return this.writeSnapshot;
    }

    public void upload(MessageType messageType, int i, List<String> list) {
        if (loadSnapshot()) {
            LOG.info("No later {} snapshots have to be uploaded", messageType.name().toLowerCase(Locale.ROOT));
        } else {
            uploadObjects(messageType, i, list);
        }
    }

    public void load() {
        int id = this.workerInfo.id();
        for (int i = 0; i < this.partitionCount; i++) {
            if (this.partitioner.workerId(i) == id) {
                this.sendManager.startSend(MessageType.VERTEX);
                downloadObjects(MessageType.VERTEX, i);
                this.sendManager.finishSend(MessageType.VERTEX);
                this.sendManager.startSend(MessageType.EDGE);
                downloadObjects(MessageType.EDGE, i);
                this.sendManager.finishSend(MessageType.EDGE);
            }
        }
    }

    private void uploadObjects(MessageType messageType, int i, List<String> list) {
        String generateObjectDirName = generateObjectDirName(messageType, i);
        try {
            clearObjectsIfExist(generateObjectDirName);
            LOG.info("Upload {} snapshots for partition {}", messageType.name().toLowerCase(Locale.ROOT), Integer.valueOf(i));
            for (String str : list) {
                uploadObject(str, generateObjectDirName + new File(str).getName());
            }
        } catch (Exception e) {
            throw new ComputerException("Failed to clear out-dated snapshots from %s", new Object[]{generateObjectDirName, e});
        }
    }

    private void downloadObjects(MessageType messageType, int i) {
        LOG.info("Load {} snapshots for partition {}", messageType.name().toLowerCase(Locale.ROOT), Integer.valueOf(i));
        String generateObjectDirName = generateObjectDirName(messageType, i);
        try {
            Iterable listObjects = this.minioClient.listObjects(ListObjectsArgs.builder().bucket(this.bucketName).prefix(generateObjectDirName).build());
            if (!listObjects.iterator().hasNext()) {
                throw new ComputerException("Empty snapshot directory %s", new Object[]{generateObjectDirName});
            }
            Iterator it = listObjects.iterator();
            while (it.hasNext()) {
                Item item = (Item) ((Result) it.next()).get();
                int size = (int) item.size();
                String objectName = item.objectName();
                String genOutputPath = this.recvManager.genOutputPath(messageType, i);
                downloadObject(objectName, genOutputPath);
                this.recvManager.handle(messageType, i, new FileRegionBuffer(size, genOutputPath));
            }
        } catch (Exception e) {
            throw new ComputerException("Failed to download snapshots from %s", new Object[]{generateObjectDirName, e});
        }
    }

    private void uploadObject(String str, String str2) {
        try {
            this.minioClient.uploadObject(UploadObjectArgs.builder().bucket(this.bucketName).object(str2).filename(str).build());
        } catch (Exception e) {
            throw new ComputerException("Failed to upload snapshot %s to %s", new Object[]{str, str2, e});
        }
    }

    private void downloadObject(String str, String str2) {
        try {
            this.minioClient.downloadObject(DownloadObjectArgs.builder().bucket(this.bucketName).object(str).filename(str2).build());
        } catch (Exception e) {
            throw new ComputerException("Failed to download snapshot from %s to %s", new Object[]{str, str2, e});
        }
    }

    private void clearObjectsIfExist(String str) throws Exception {
        LinkedList linkedList = new LinkedList();
        Iterable listObjects = this.minioClient.listObjects(ListObjectsArgs.builder().bucket(this.bucketName).prefix(str).build());
        if (listObjects.iterator().hasNext()) {
            LOG.info("Clear out-dated snapshots from {} first", str);
            Iterator it = listObjects.iterator();
            while (it.hasNext()) {
                linkedList.add(new DeleteObject(((Item) ((Result) it.next()).get()).objectName()));
            }
            Iterator it2 = this.minioClient.removeObjects(RemoveObjectsArgs.builder().bucket(this.bucketName).objects(linkedList).build()).iterator();
            if (it2.hasNext()) {
                DeleteError deleteError = (DeleteError) ((Result) it2.next()).get();
                throw new ComputerException("Failed to delete snapshot %s, error message: %s", new Object[]{deleteError.objectName(), deleteError.message()});
            }
        }
    }

    private String generateObjectDirName(MessageType messageType, int i) {
        return Paths.get(this.snapshotName, this.partitioner.getClass().getSimpleName(), String.valueOf(this.partitionCount), messageType.name(), String.valueOf(i)) + "/";
    }
}
