/*
 * Decompiled with CFR 0.152.
 */
package org.rostore.v2.container.async;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import org.rostore.entity.Record;
import org.rostore.entity.RoStoreException;
import org.rostore.entity.StringKeyList;
import org.rostore.mapper.BinaryMapper;
import org.rostore.mapper.MapperProperties;
import org.rostore.v2.container.Container;
import org.rostore.v2.container.DataWithRecord;
import org.rostore.v2.container.async.AsyncContainerAccessException;
import org.rostore.v2.container.async.AsyncContainers;
import org.rostore.v2.container.async.AsyncListener;
import org.rostore.v2.container.async.AsyncStatus;
import org.rostore.v2.container.async.AsyncStream;
import org.rostore.v2.container.async.ContainerShardExecutor;
import org.rostore.v2.container.async.OperationExecutionException;
import org.rostore.v2.container.async.OperationExecutionRuntimeException;
import org.rostore.v2.container.async.OperationType;
import org.rostore.v2.keys.KeyList;
import org.rostore.v2.media.Closeable;
import org.rostore.v2.media.block.container.Status;

public class AsyncContainer
implements Closeable {
    private final Container container;
    private final List<ContainerShardExecutor> containerShardExecutors;
    private long lastExecutorWentIdleTimestamp;
    private boolean shutdown;
    private AsyncContainers asyncContainers;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closeIfIdle(long idleMillis) {
        if (!this.shutdown) {
            boolean shutdownStarted = false;
            AsyncContainer asyncContainer = this;
            synchronized (asyncContainer) {
                long howLongIdle;
                if (this.isIdle() && (howLongIdle = this.lastExecutorWentIdleTimestamp - System.currentTimeMillis()) > idleMillis) {
                    this.shutdown();
                    shutdownStarted = true;
                }
            }
            if (shutdownStarted) {
                this.waitForShutdown();
                this.container.close();
                this.asyncContainers.evict(this.container.getName());
            }
        }
    }

    public Container getContainer() {
        return this.container;
    }

    private synchronized void shutdown() {
        this.shutdown = true;
        for (int i = 0; i < this.containerShardExecutors.size(); ++i) {
            if (this.containerShardExecutors.get(i) == null) continue;
            this.containerShardExecutors.get(i).shutdown();
        }
    }

    private void shutdownAndWait() {
        this.shutdown();
        this.waitForShutdown();
    }

    public void waitForShutdown() {
        if (!this.shutdown) {
            throw new AsyncContainerAccessException("Can't wait for an active async container.");
        }
        for (int i = 0; i < this.containerShardExecutors.size(); ++i) {
            if (this.containerShardExecutors.get(i) == null) continue;
            this.containerShardExecutors.get(i).shutdownAndWait();
        }
    }

    public synchronized boolean isIdle() {
        boolean atLeastOneIsStillRunning = false;
        for (int i = 0; i < this.containerShardExecutors.size(); ++i) {
            if (this.containerShardExecutors.get(i) == null) continue;
            atLeastOneIsStillRunning = atLeastOneIsStillRunning || !this.containerShardExecutors.get(i).isIdle();
        }
        return !atLeastOneIsStillRunning;
    }

    public void notifyIdle(ContainerShardExecutor containerShardExecutor) {
        this.lastExecutorWentIdleTimestamp = System.currentTimeMillis();
    }

    public AsyncContainers getAsyncContainers() {
        return this.asyncContainers;
    }

    public AsyncContainer(AsyncContainers asyncContainers, Container container) {
        this.container = container;
        this.asyncContainers = asyncContainers;
        this.containerShardExecutors = new ArrayList<ContainerShardExecutor>(container.getDescriptor().getContainerMeta().getShardNumber());
        for (int i = 0; i < container.getDescriptor().getContainerMeta().getShardNumber(); ++i) {
            this.containerShardExecutors.add(null);
        }
        this.shutdown = false;
        this.lastExecutorWentIdleTimestamp = System.currentTimeMillis();
    }

    private static <T> T resolveFuture(Future<T> future) {
        try {
            return future.get();
        }
        catch (InterruptedException e) {
            throw new RoStoreException("The execution has been interrupted", (Throwable)e);
        }
        catch (OperationExecutionException e) {
            throw new OperationExecutionRuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RoStoreException("Unknown execution failure", (Throwable)e);
        }
    }

    public <T extends InputStream> void put(int sessionId, byte[] key, AsyncStream<T> data, Record record) {
        this.putAsync(sessionId, key, data, record);
    }

    public <T> void put(int sessionId, byte[] key, T data, Record record) {
        this.put(sessionId, key, (OutputStream outputStream) -> BinaryMapper.serialize((MapperProperties)this.container.getContainerListOperations().getMedia().getMediaProperties().getMapperProperties(), (Object)data, (OutputStream)outputStream), record);
    }

    public <T> Record put(int sessionId, String key, T data) {
        Record record = new Record();
        this.put(sessionId, key, data, record);
        return record;
    }

    public <T> void put(int sessionId, String key, T data, Record record) {
        this.put(sessionId, key.getBytes(StandardCharsets.UTF_8), data, record);
    }

    public <T> Record put(int sessionId, String key, Consumer<OutputStream> serializer) {
        return this.put(sessionId, key.getBytes(StandardCharsets.UTF_8), serializer);
    }

    public Record put(int sessionId, byte[] key, Consumer<OutputStream> serializer) {
        Record record = new Record();
        this.put(sessionId, key, serializer, record);
        return record;
    }

    public void put(int sessionId, byte[] key, Consumer<OutputStream> serializer, Record record) {
        try (PipedOutputStream pipedOutputStream = new PipedOutputStream();){
            PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);
            AsyncStream<PipedInputStream> asyncStream = AsyncStream.wrap(pipedInputStream);
            this.putAsync(sessionId, key, asyncStream, record);
            serializer.accept(pipedOutputStream);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public <T extends InputStream> void putAsync(int sessionId, byte[] key, AsyncStream<T> asyncStream) {
        this.putAsync(sessionId, key, asyncStream, new Record());
    }

    public <T extends InputStream> void putAsync(int sessionId, byte[] key, AsyncStream<T> asyncStream, Record record) {
        ContainerShardExecutor shardExecutor = this.getShardExecutorByKey(key);
        shardExecutor.executeValue(sessionId, OperationType.WRITE, 0L, true, () -> {
            asyncStream.processFunction(inputStream -> {
                long id = shardExecutor.getShard().putValue(inputStream);
                record.id(id);
                shardExecutor.executeKey(sessionId, OperationType.WRITE, false, ops -> {
                    long prevId;
                    try {
                        prevId = ops.putKey(key, record);
                        asyncStream.notifyRecord(record);
                    }
                    catch (Exception e) {
                        if (record.getId() != -1L) {
                            shardExecutor.executeAutonomousValue(sessionId, OperationType.DELETE, record.getId(), false, () -> shardExecutor.getShard().removeValue(record.getId()));
                        }
                        throw e;
                    }
                    if (prevId != -1L) {
                        shardExecutor.executeAutonomousValue(sessionId, OperationType.DELETE, prevId, false, () -> shardExecutor.getShard().removeValue(prevId));
                    }
                    return record;
                }).get();
            });
            return true;
        });
    }

    public <T extends OutputStream> void getAsync(int sessionId, byte[] key, AsyncStream<T> asyncStream) {
        ContainerShardExecutor shardExecutor = this.getShardExecutorByKey(key);
        shardExecutor.executeKey(sessionId, OperationType.READ, true, ops -> {
            try {
                Record record = ops.getKey(key);
                if (record != null) {
                    asyncStream.notifyRecord(record);
                    if (record.getId() != -1L) {
                        shardExecutor.executeAutonomousValue(sessionId, OperationType.READ, record.getId(), false, () -> asyncStream.processFunction(outputStream -> shardExecutor.getShard().getValue(record, outputStream)));
                    } else {
                        asyncStream.empty();
                    }
                } else {
                    asyncStream.cancel(true);
                }
                return record;
            }
            catch (Exception e) {
                asyncStream.fail(e);
                throw e;
            }
        });
    }

    public <T> DataWithRecord<T> get(int sessionId, byte[] key, Class<T> clazz) {
        return this.get(sessionId, key, (InputStream inputStream) -> BinaryMapper.deserialize((MapperProperties)this.container.getContainerListOperations().getMedia().getMediaProperties().getMapperProperties(), (Type)clazz, (InputStream)inputStream));
    }

    public <T> DataWithRecord<T> get(int sessionId, String key, Class<T> clazz) {
        return this.get(sessionId, key.getBytes(StandardCharsets.UTF_8), clazz);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public <T> DataWithRecord<T> get(int sessionId, byte[] key, Function<InputStream, T> deserializer) {
        try (PipedOutputStream pos = new PipedOutputStream();){
            PipedInputStream pipedInputStream = new PipedInputStream(pos);
            final Record[] storedRecord = new Record[1];
            AsyncStream<PipedOutputStream> pipedOutputStreamAsyncStream = AsyncStream.wrapBlocking(pos, new AsyncListener(){

                @Override
                public void record(Record record) {
                    storedRecord[0] = record;
                }

                @Override
                public void error(Exception e) {
                }

                @Override
                public void status(AsyncStatus asyncStatus) {
                }
            });
            this.getAsync(sessionId, key, pipedOutputStreamAsyncStream);
            pipedOutputStreamAsyncStream.get();
            if (storedRecord[0] == null) {
                DataWithRecord<T> dataWithRecord = null;
                return dataWithRecord;
            }
            InputStream data = deserializer.apply(pipedInputStream);
            DataWithRecord dataWithRecord = new DataWithRecord(storedRecord[0], (Object)data);
            return dataWithRecord;
        }
        catch (IOException ioException) {
            throw new RoStoreException("Can't create a piped outputStream");
        }
    }

    public KeyList list(int sessionId, byte[] startWithKey, byte[] continuationKey, int maxNumber, int maxSize) {
        return AsyncContainer.resolveFuture(this.listAsync(sessionId, startWithKey, continuationKey, maxNumber, maxSize));
    }

    public StringKeyList list(int sessionId, String startWithKey, String continuationKey, int maxNumber, int maxSize) {
        return new StringKeyList(AsyncContainer.resolveFuture(this.listAsync(sessionId, startWithKey != null ? startWithKey.getBytes(StandardCharsets.UTF_8) : null, continuationKey != null ? continuationKey.getBytes(StandardCharsets.UTF_8) : null, maxNumber, maxSize)));
    }

    public Future<KeyList> listAsync(int sessionId, byte[] startWithKey, byte[] continuationKey, int maxNumber, int maxSize) {
        return this.asyncContainers.getExecutorService().submit(() -> {
            KeyList result = new KeyList();
            int shardIndex = continuationKey == null ? 0 : this.getShardIndexByKey(continuationKey);
            byte[] iterationKey = continuationKey;
            int[] iterationParams = new int[]{maxNumber, maxSize};
            while (true) {
                byte[] finalIterationKey = iterationKey;
                ContainerShardExecutor shardExecutor = this.getShardExecutorByIndex(shardIndex);
                KeyList iteration = shardExecutor.executeKey(sessionId, OperationType.READ, true, ops -> ops.listKeys(startWithKey, finalIterationKey, iterationParams[0], iterationParams[1])).get();
                result.getKeys().addAll(iteration.getKeys());
                result.setSize(result.getSize() + iteration.getSize());
                if (iteration.isMore()) {
                    result.setMore(true);
                    return result;
                }
                iterationParams[0] = iterationParams[0] - iteration.getKeys().size();
                iterationParams[1] = (int)((long)iterationParams[1] - iteration.getSize());
                if (iterationParams[0] <= 0 || iterationParams[1] <= 0) {
                    result.setMore(true);
                    return result;
                }
                if (++shardIndex >= this.getContainer().getDescriptor().getContainerMeta().getShardNumber()) {
                    return result;
                }
                iterationKey = null;
            }
        });
    }

    public boolean remove(int sessionId, byte[] key, Record record) {
        return AsyncContainer.resolveFuture(this.removeAsync(sessionId, key, record));
    }

    public boolean remove(int sessionId, String key) {
        return this.remove(sessionId, key, new Record());
    }

    public boolean remove(int sessionId, String key, Record record) {
        return this.remove(sessionId, key.getBytes(StandardCharsets.UTF_8), record);
    }

    public Future<Boolean> removeAsync(int sessionId, byte[] key, Record record) {
        ContainerShardExecutor shardExecutor = this.getShardExecutorByKey(key);
        return shardExecutor.executeKey(sessionId, OperationType.DELETE, true, ops -> {
            boolean result = ops.removeKey(key, record);
            if (record.getId() == -1L) {
                return result;
            }
            shardExecutor.executeAutonomousValue(sessionId, OperationType.DELETE, record.getId(), false, () -> shardExecutor.getShard().removeValue(record.getId()));
            return result;
        });
    }

    private int getShardIndexByKey(byte[] key) {
        int hashcode = this.computeHashCode(key);
        int shardIndex = hashcode * this.container.getDescriptor().getContainerMeta().getShardNumber() >> 8;
        return shardIndex;
    }

    public ContainerShardExecutor getShardExecutorByKey(byte[] key) {
        int shardIndex = this.getShardIndexByKey(key);
        return this.getShardExecutorByIndex(shardIndex);
    }

    public synchronized ContainerShardExecutor getShardExecutorByIndex(int shardIndex) {
        if (this.shutdown) {
            throw new AsyncContainerAccessException("Container is in shutdown mode.");
        }
        ContainerShardExecutor containerShardExecutor = this.containerShardExecutors.get(shardIndex);
        if (containerShardExecutor == null) {
            containerShardExecutor = new ContainerShardExecutor(this, this.container.getShard(shardIndex));
            this.containerShardExecutors.set(shardIndex, containerShardExecutor);
        }
        return containerShardExecutor;
    }

    private int computeHashCode(byte[] key) {
        int every = key.length / 10;
        if (every < 1) {
            every = 1;
        }
        int sum = 0;
        for (int i = 0; i < key.length; i += every) {
            sum += key[i];
        }
        return sum & 0xFF;
    }

    public void close() {
        this.shutdownAndWait();
        this.container.close();
        this.asyncContainers.evict(this.container.getName());
    }

    public void remove() {
        this.shutdownAndWait();
        this.container.getContainerListOperations().remove(this.container.getName());
        this.asyncContainers.evict(this.container.getName());
    }

    public Status getStatus() {
        return this.container.getStatus();
    }
}

