/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services.rlclient;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.services.ServiceActor;
import org.nustaq.kontraktor.services.rlclient.DataCfg;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.impl.actors.ShardFunc;
import org.nustaq.reallive.impl.tablespace.ClusteredTableSpaceClient;
import org.nustaq.reallive.impl.tablespace.TableSpaceActor;
import org.nustaq.reallive.impl.tablespace.TableSpaceSharding;
import org.nustaq.serialization.FSTConfiguration;

public class DataClient<T extends DataClient>
extends ClusteredTableSpaceClient<T> {
    DataCfg config;
    ServiceActor hostingService;
    TableSpaceActor[] shards;
    HashMap syncTableAccess;

    public IPromise connect(DataCfg config, TableSpaceActor[] shards, ServiceActor hostingService) {
        this.config = config;
        this.hostingService = hostingService;
        this.shards = shards;
        this.syncTableAccess = new HashMap();
        this.tableSharding = new TableSpaceSharding(shards, (ShardFunc & Serializable)key -> Math.abs(key.hashCode()) % shards.length);
        this.tableSharding.init().await();
        TableDescription[] schema = config.getSchema();
        return DataClient.all((int)schema.length, i -> {
            Promise p = new Promise();
            this.tableSharding.createOrLoadTable(schema[i]).then((Callback & Serializable)(r, e) -> {
                if (r != null) {
                    this.syncTableAccess.put(schema[i].getName(), r);
                }
                p.complete(r, e);
            });
            return p;
        });
    }

    @CallerSideMethod
    public RealLiveTable getTable(String name) {
        return (RealLiveTable)((DataClient)this.getActor()).syncTableAccess.get(name);
    }

    @CallerSideMethod
    public RealLiveTable tbl(String name) {
        return (RealLiveTable)((DataClient)this.getActor()).syncTableAccess.get(name);
    }

    public IPromise export(String directory) {
        Promise res = new Promise();
        Actors.exec.execute(() -> {
            File d = new File(directory);
            if (!(!d.exists() || d.isDirectory() && d.canWrite())) {
                res.reject((Object)new RuntimeException("cannot write to " + d + " or not a directory"));
                return;
            }
            d.mkdirs();
            FSTConfiguration writeConf = FSTConfiguration.createDefaultConfiguration();
            Arrays.stream(this.config.getSchema()).forEach(desc -> {
                try {
                    DataOutputStream fout = new DataOutputStream(new FileOutputStream(new File(d, desc.getName() + ".oos")));
                    CountDownLatch pl = new CountDownLatch(this.shards.length);
                    for (int i = 0; i < this.shards.length; ++i) {
                        TableSpaceActor shard = this.shards[i];
                        Log.Info((Object)((Object)this), (String)("exporting shard " + i + " table " + desc.getName()));
                        try {
                            RealLiveTable table = (RealLiveTable)shard.getTableAsync(desc.getName()).await(60000L);
                            table.forEach((RLPredicate & Serializable)rec -> true, (Callback & Serializable)(rec, err) -> {
                                if (rec != null) {
                                    try {
                                        DataOutputStream dataOutputStream = fout;
                                        synchronized (dataOutputStream) {
                                            fout.write(31);
                                            fout.write(32);
                                            fout.write(33);
                                            fout.write(34);
                                            byte[] b = writeConf.asByteArray(rec);
                                            fout.writeInt(b.length);
                                            fout.write(b);
                                        }
                                    }
                                    catch (IOException e) {
                                        Log.Error((Object)((Object)this), (Throwable)e);
                                    }
                                } else if (err != null) {
                                    Log.Warn((Object)((Object)this), (String)("error during export " + err));
                                    pl.countDown();
                                } else {
                                    pl.countDown();
                                }
                            });
                            continue;
                        }
                        catch (Exception e) {
                            Log.Error((Object)((Object)this), (String)("export failure " + desc.getName() + " shard " + i));
                        }
                    }
                    try {
                        boolean succ = pl.await(5L, TimeUnit.MINUTES);
                        if (!succ) {
                            Log.Error((Object)((Object)this), (String)("export timed out on table " + desc.getName()));
                        }
                        try {
                            fout.close();
                        }
                        catch (IOException e) {
                            Log.Error((Object)((Object)this), (Throwable)e);
                        }
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                catch (FileNotFoundException e) {
                    Log.Error((Object)((Object)this), (Throwable)e);
                }
            });
            res.complete();
        });
        return res;
    }

    public IPromise<Integer> getNoShards() {
        return DataClient.resolve((Object)this.shards.length);
    }

    public void processSharded(String tableName, RLPredicate<Record> predicate, int shardNo, Callback<Record> cb) {
        TableSpaceActor shard = this.shards[shardNo];
        shard.getTableAsync(tableName).then(t -> {
            RealLiveTable table = t;
            table.forEach(predicate, cb);
        });
    }
}

