package com.github.jnthnclt.os.lab.s3;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.github.jnthnclt.os.lab.api.ValueIndex;
import com.github.jnthnclt.os.lab.base.BolBuffer;
import com.github.jnthnclt.os.lab.base.UIO;
import com.github.jnthnclt.os.lab.core.LABEnvironmentBuilder;
import com.github.jnthnclt.os.lab.core.LABHeapPressureBuilder;
import com.github.jnthnclt.os.lab.core.LABIndexProvider;
import com.github.jnthnclt.os.lab.core.LABStats;
import com.github.jnthnclt.os.lab.core.LABValueIndexConfigBuilder;
import com.github.jnthnclt.os.lab.core.api.ValueIndexConfig;
import com.github.jnthnclt.os.lab.core.guts.LABFiles;
import com.github.jnthnclt.os.lab.log.LABLogger;
import com.github.jnthnclt.os.lab.log.LABLoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.TreeRangeSet;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;

/* loaded from: input_file:com/github/jnthnclt/os/lab/s3/ContiniousBackupLABInitializer.class */
public class ContiniousBackupLABInitializer {
    private static final LABLogger LOG = LABLoggerFactory.getLogger();

    /* loaded from: input_file:com/github/jnthnclt/os/lab/s3/ContiniousBackupLABInitializer$AcquireIndex.class */
    public interface AcquireIndex<R> {
        long[] index(ValueIndex<byte[]> valueIndex) throws Exception;
    }

    /* loaded from: input_file:com/github/jnthnclt/os/lab/s3/ContiniousBackupLABInitializer$LABContinuousS3Backup.class */
    public static class LABContinuousS3Backup<I> {
        private static final LABLogger LOG = LABLoggerFactory.getLogger();
        private final BackUpper backUpper;
        private final LABFiles labFiles;
        private final LABIndexProvider<byte[]> indexProvider;
        private final BackupPoint<I> backupPoint;
        private final ExecutorService backupThread = Executors.newSingleThreadExecutor();
        private final AtomicBoolean running = new AtomicBoolean(false);
        private final AtomicBoolean stopped = new AtomicBoolean(false);
        private final Map<String, RootAndIndex> indexes = Maps.newConcurrentMap();
        private final Map<String, ConcurrentLinkedQueue<BatchId<I>>> wroteRanges = Maps.newConcurrentMap();
        private final Map<String, TreeRangeSet<Long>> flushedRanges = Maps.newConcurrentMap();

        /* loaded from: input_file:com/github/jnthnclt/os/lab/s3/ContiniousBackupLABInitializer$LABContinuousS3Backup$BackupPoint.class */
        public interface BackupPoint<I> {
            void backedUp(String str, I i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/github/jnthnclt/os/lab/s3/ContiniousBackupLABInitializer$LABContinuousS3Backup$BatchId.class */
        public static class BatchId<I> {
            private final Range<Long> range;
            private final I batchId;
            public String indexName;
            private AtomicBoolean completed;

            private BatchId(Range<Long> range, String str, I i) {
                this.completed = new AtomicBoolean(false);
                this.range = range;
                this.indexName = str;
                this.batchId = i;
            }

            public void setCompleted() {
                this.completed.compareAndSet(false, true);
            }

            public boolean isComplete() {
                return this.completed.get();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/github/jnthnclt/os/lab/s3/ContiniousBackupLABInitializer$LABContinuousS3Backup$RootAndIndex.class */
        public static class RootAndIndex<I> {
            private final File root;
            private final ValueIndex<byte[]> index;

            private RootAndIndex(File file, ValueIndex<byte[]> valueIndex) {
                this.root = file;
                this.index = valueIndex;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public String buildKey(File file) {
                return file.getAbsolutePath().substring(this.root.getAbsolutePath().length() + 1);
            }
        }

        public LABContinuousS3Backup(BackUpper backUpper, LABFiles lABFiles, LABIndexProvider<byte[]> lABIndexProvider, BackupPoint<I> backupPoint) {
            this.backUpper = backUpper;
            this.labFiles = lABFiles;
            this.indexProvider = lABIndexProvider;
            this.backupPoint = backupPoint;
        }

        public void open(File file, ValueIndexConfig valueIndexConfig) throws Exception {
            this.indexes.putIfAbsent(valueIndexConfig.primaryName, new RootAndIndex(file, this.indexProvider.buildIndex(file, valueIndexConfig)));
        }

        public void acquireIndex(String str, I i, AcquireIndex acquireIndex) throws Exception {
            RootAndIndex rootAndIndex = this.indexes.get(str);
            if (i == null) {
                acquireIndex.index(rootAndIndex.index);
                return;
            }
            long[] index = acquireIndex.index(rootAndIndex.index);
            if (index == null || index[0] >= index[1]) {
                return;
            }
            ConcurrentLinkedQueue<BatchId<I>> computeIfAbsent = this.wroteRanges.computeIfAbsent(str, str2 -> {
                return new ConcurrentLinkedQueue();
            });
            Range closed = Range.closed(Long.valueOf(index[0]), Long.valueOf(index[1]));
            LOG.info("Added " + closed + " for " + i);
            computeIfAbsent.add(new BatchId<>(closed, str, i));
        }

        public void start() throws Exception {
            if (this.running.compareAndSet(false, true)) {
                this.backupThread.submit(() -> {
                    try {
                        this.labFiles.take(appendedFile -> {
                            ConcurrentLinkedQueue<BatchId<I>> concurrentLinkedQueue;
                            if (appendedFile == null) {
                                Iterator<RootAndIndex> it = this.indexes.values().iterator();
                                while (it.hasNext()) {
                                    if (!it.next().index.closed()) {
                                        return true;
                                    }
                                }
                                return this.running.get();
                            }
                            String str = new String(appendedFile.labId, StandardCharsets.UTF_8);
                            String buildKey = this.indexes.get(str).buildKey(appendedFile.file);
                            if (appendedFile.delete) {
                                while (true) {
                                    try {
                                        this.backUpper.delete(buildKey, appendedFile.file);
                                        return true;
                                    } catch (Exception e) {
                                        LOG.error("Failed to remove " + buildKey + " from backup. Will retry in 5 sec", e);
                                        Thread.sleep(5000L);
                                    }
                                }
                            } else {
                                while (true) {
                                    try {
                                        boolean z = false;
                                        if (appendedFile.file.exists()) {
                                            try {
                                                LOG.info("Backing up index:" + str + " key:" + buildKey + " " + UIO.ram(FileUtils.sizeOf(appendedFile.file)) + "...");
                                            } catch (IllegalArgumentException e2) {
                                            }
                                            this.backUpper.backup(buildKey, appendedFile.file);
                                            z = true;
                                            LOG.info("Backed up index:" + str + " version:[" + appendedFile.fromAppendVersion + ".." + appendedFile.toAppendVersion + "] key:" + buildKey);
                                        }
                                        if (!z || appendedFile.fromAppendVersion == -1 || appendedFile.toAppendVersion == -1 || (concurrentLinkedQueue = this.wroteRanges.get(str)) == null) {
                                            return true;
                                        }
                                        TreeRangeSet<Long> computeIfAbsent = this.flushedRanges.computeIfAbsent(str, str2 -> {
                                            return TreeRangeSet.create();
                                        });
                                        computeIfAbsent.add(Range.closed(Long.valueOf(appendedFile.fromAppendVersion), Long.valueOf(appendedFile.toAppendVersion + 1)));
                                        Iterator<BatchId<I>> it2 = concurrentLinkedQueue.iterator();
                                        while (it2.hasNext()) {
                                            BatchId<I> next = it2.next();
                                            if (computeIfAbsent.encloses(((BatchId) next).range)) {
                                                next.setCompleted();
                                            }
                                        }
                                        ArrayList<BatchId> newArrayList = Lists.newArrayList();
                                        Iterator<BatchId<I>> it3 = concurrentLinkedQueue.iterator();
                                        while (it3.hasNext()) {
                                            BatchId<I> next2 = it3.next();
                                            if (!next2.isComplete()) {
                                                break;
                                            }
                                            newArrayList.add(next2);
                                            it3.remove();
                                        }
                                        LOG.info("Flushed RangeSet:" + computeIfAbsent);
                                        for (BatchId batchId : newArrayList) {
                                            try {
                                                this.backupPoint.backedUp(batchId.indexName, batchId.batchId);
                                            } catch (Exception e3) {
                                                LOG.error("Failure while call back point callback.", e3);
                                            }
                                        }
                                        return true;
                                    } catch (Exception e4) {
                                        LOG.error("Failed to backup index:" + str + " key:" + buildKey + ". Will retry in 5 sec", e4);
                                        Thread.sleep(5000L);
                                    }
                                }
                            }
                        });
                    } catch (Exception e) {
                        LOG.error("Unexpected shutdown :(", e);
                    }
                    synchronized (this.stopped) {
                        if (this.stopped.compareAndSet(false, true)) {
                            LOG.info("Stopped");
                            this.stopped.notifyAll();
                        }
                    }
                    return true;
                });
            }
        }

        public void stop() throws Exception {
            if (this.running.compareAndSet(true, false)) {
                synchronized (this.stopped) {
                    if (!this.stopped.get()) {
                        LOG.info("Waiting for backup service to stop...");
                        this.stopped.wait();
                    }
                }
                this.backupThread.shutdown();
                LOG.info("Backup service has stopped");
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        File file = new File("");
        File file2 = new File("");
        S3BackUpper s3BackUpper = new S3BackUpper((AmazonS3) AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", ""))).withRegion(Regions.US_EAST_1).build(), "", "");
        long j = 1000000;
        FileUtils.cleanDirectory(file);
        LABContinuousS3Backup.BackupPoint backupPoint = (str, l) -> {
            System.out.println("Hooray index:" + str + " is backed up to:" + l);
        };
        LABContinuousS3Backup initialize = initialize(s3BackUpper, backupPoint);
        LABValueIndexConfigBuilder lABValueIndexConfigBuilder = new LABValueIndexConfigBuilder("foo");
        lABValueIndexConfigBuilder.setSplitWhenValuesAndKeysTotalExceedsNBytes(10485760);
        initialize.open(file, lABValueIndexConfigBuilder.build());
        initialize.start();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        ArrayList newArrayList = Lists.newArrayList();
        int i = 10000;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= 1000000) {
                break;
            }
            newArrayList.add(newFixedThreadPool.submit(() -> {
                initialize.acquireIndex("foo", Long.valueOf(j3), valueIndex -> {
                    long[] jArr = new long[2];
                    boolean z = true;
                    for (int i2 = 0; i2 < i; i2++) {
                        long j4 = j3 + i2;
                        jArr[z ? (char) 0 : (char) 1] = valueIndex.append(appendValueStream -> {
                            appendValueStream.stream(1, UIO.longBytes(j4), System.currentTimeMillis(), false, System.currentTimeMillis(), (j4 + " fluffy bunnies ate my carrots").getBytes());
                            return true;
                        }, true, new BolBuffer(), new BolBuffer());
                        z = false;
                    }
                    LOG.info("Commit " + (j3 + i) + "->" + j);
                    valueIndex.commit(true, false);
                    Thread.sleep(2000L);
                    return jArr;
                });
                return null;
            }));
            j2 = j3 + 10000;
        }
        System.out.println("Waiting on batches...");
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        System.out.println("Indexing done");
        initialize.acquireIndex("foo", null, valueIndex -> {
            valueIndex.close(true, true);
            return null;
        });
        System.out.println("Flushing done");
        initialize.stop();
        s3BackUpper.restore(file2, "foo");
        System.out.println("Restore index done");
        AtomicLong atomicLong = new AtomicLong();
        LABContinuousS3Backup initialize2 = initialize(s3BackUpper, backupPoint);
        LABValueIndexConfigBuilder lABValueIndexConfigBuilder2 = new LABValueIndexConfigBuilder("foo");
        lABValueIndexConfigBuilder2.setSplitWhenValuesAndKeysTotalExceedsNBytes(10485760);
        initialize2.open(file2, lABValueIndexConfigBuilder2.build());
        initialize2.acquireIndex("foo", null, valueIndex2 -> {
            System.out.println(valueIndex2.count());
            valueIndex2.rowScan((i2, bolBuffer, j4, z, j5, bolBuffer2) -> {
                long j4 = bolBuffer.getLong(0);
                atomicLong.incrementAndGet();
                if (j4 % 10000 != 0) {
                    return true;
                }
                System.out.println(i2 + " " + j4 + " " + new String(bolBuffer2.copy()));
                return true;
            }, true);
            return null;
        });
        System.out.println(atomicLong.get());
        System.exit(0);
    }

    public static <I> LABContinuousS3Backup<I> initialize(BackUpper backUpper, LABContinuousS3Backup.BackupPoint<I> backupPoint) {
        LABFiles lABFiles = new LABFiles();
        AtomicLong atomicLong = new AtomicLong();
        LABStats lABStats = new LABStats(atomicLong);
        LABHeapPressureBuilder lABHeapPressureBuilder = new LABHeapPressureBuilder(atomicLong);
        lABHeapPressureBuilder.setMaxHeapPressureInBytes(33554432L);
        return new LABContinuousS3Backup<>(backUpper, lABFiles, new LABIndexProvider(lABStats, lABHeapPressureBuilder, new LABEnvironmentBuilder().setLABFiles(lABFiles)), backupPoint);
    }
}
