/*
 * Decompiled with CFR 0.152.
 */
package com.pivotal.pxf.plugins.gemfirexd;

import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.pivotal.gemfirexd.hadoop.mapred.Key;
import com.pivotal.gemfirexd.hadoop.mapred.MapRedRowRecordReader;
import com.pivotal.gemfirexd.hadoop.mapred.Row;
import com.pivotal.gemfirexd.internal.engine.Misc;
import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer;
import com.pivotal.pxf.api.OneRow;
import com.pivotal.pxf.api.ReadAccessor;
import com.pivotal.pxf.api.utilities.InputData;
import com.pivotal.pxf.api.utilities.Plugin;
import com.pivotal.pxf.plugins.gemfirexd.util.GemFireXDManager;
import java.io.IOException;
import java.util.HashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;

public class GemFireXDAccessor
extends Plugin
implements ReadAccessor {
    private InputFormat<Key, Row> inputFormat = null;
    private GemFireXDManager gfxdManager;
    private InputSplit split;
    protected Configuration conf = null;
    protected MapRedRowRecordReader reader = null;
    protected ListIterator<InputSplit> iter = null;
    protected JobConf jobConf = null;
    protected Key key;
    protected Row data;
    private static Object lockObject = new Object();
    private static String currentHomeDirs = null;
    private static int lonerRefCount = 0;
    private static Map<String, Object> referenceCounts = new HashMap<String, Object>();
    public static ThreadLocal<String> tableName = new ThreadLocal();
    private boolean accessorClosed = true;
    private boolean isWriteOnly = false;
    private String[] pkColumns = null;
    private boolean newHomeDirFound = false;
    private static final int THRESHOLD_MILIS = 30000;

    public GemFireXDAccessor(InputData input) throws IOException {
        this(input, new GemFireXDManager(input));
    }

    public GemFireXDAccessor(InputData input, GemFireXDManager mgr) throws IOException {
        super(input);
        this.gfxdManager = mgr;
        this.gfxdManager.readUserData();
        tableName.set(this.gfxdManager.getTable());
        this.split = this.gfxdManager.getSplit();
        this.inputFormat = this.gfxdManager.getInputFormat();
        this.conf = new Configuration();
        this.jobConf = new JobConf(this.conf, GemFireXDAccessor.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean openForRead() throws Exception {
        Object object = lockObject;
        synchronized (object) {
            try {
                if (this.needToShutdownLoner()) {
                    this.shutdownLoner();
                }
                if (this.gfxdManager.getLogger().isDebugEnabled()) {
                    this.gfxdManager.getLogger().debug("Accessor getting reader for split " + this.split.toString());
                }
                this.gfxdManager.configureJob(this.jobConf, currentHomeDirs);
                this.reader = (MapRedRowRecordReader)this.inputFormat.getRecordReader(this.split, this.jobConf, null);
                this.updateTableInfo();
            }
            catch (Exception e) {
                this.resetLonerRefCount();
                throw e;
            }
            boolean checkpoint = this.jobConf.getBoolean("gfxd.input.checkpointmode", true);
            if (!(this.tableHasPK() || checkpoint && !this.isWriteOnlyTable())) {
                throw new IllegalArgumentException("Table " + this.gfxdManager.getTable() + " does not have primary key(s) defined in GemFireXD. Querying event data from tables without primary key(s) is not supported.");
            }
            this.accessorClosed = false;
            this.incrementRefCount();
        }
        this.key = this.reader.createKey();
        this.data = this.reader.createValue();
        return true;
    }

    private boolean needToShutdownLoner() throws IOException {
        if (currentHomeDirs == null || currentHomeDirs.isEmpty()) {
            currentHomeDirs = this.gfxdManager.getHomeDir();
            return false;
        }
        return !this.isHomeDirKnown() || this.gfxdManager.isDDLTimeStampChanged();
    }

    private boolean isHomeDirKnown() {
        StringTokenizer st = new StringTokenizer(currentHomeDirs, ",");
        while (st.hasMoreTokens()) {
            if (!this.gfxdManager.getHomeDir().equalsIgnoreCase(st.nextToken())) continue;
            return true;
        }
        this.newHomeDirFound = true;
        return false;
    }

    private void shutdownLoner() throws IOException {
        long start = System.currentTimeMillis();
        while (lonerRefCount > 0) {
            try {
                this.gfxdManager.getLogger().info("Waiting for existing read requests on data in hdfs-store(s) " + currentHomeDirs + " to complete. Active requests: " + lonerRefCount);
                lockObject.wait(5000L);
            }
            catch (InterruptedException ie) {
                this.gfxdManager.getLogger().error("Returning early from accessor. " + ie);
                throw new IOException("Could not restart loner instance.");
            }
            if (System.currentTimeMillis() - start <= 30000L) continue;
            this.gfxdManager.getLogger().info("Currently, " + lonerRefCount + " read requests are active on this node.");
            if (lonerRefCount <= 0) continue;
            throw new IOException("Read requests on tables in hdfs-store(s) " + currentHomeDirs + " are active currently. New read requests on tables in hdfs-store " + this.gfxdManager.getHomeDir() + " can be processed after these are completed.");
        }
        try {
            this.gfxdManager.shutdown();
            this.gfxdManager.getLogger().info("Shutdown of loner system with home-dir(s) " + currentHomeDirs + " done.");
            if (currentHomeDirs == null || currentHomeDirs.isEmpty()) {
                currentHomeDirs = this.gfxdManager.getHomeDir();
            } else if (this.newHomeDirFound) {
                currentHomeDirs = currentHomeDirs + "," + this.gfxdManager.getHomeDir();
            }
        }
        catch (Exception e) {
            this.gfxdManager.getLogger().warn("Shutdown of loner system with home-dir(s) " + currentHomeDirs + " failed. " + e);
        }
    }

    private void updateTableInfo() throws IOException {
        this.gfxdManager.updateDDLTimeStampIfNeeded();
        Region region = Misc.getRegionForTable(this.gfxdManager.getTable(), false);
        if (region != null && region instanceof LocalRegion) {
            this.isWriteOnly = !((LocalRegion)region).isHDFSReadWriteRegion();
            this.pkColumns = ((GemFireContainer)((LocalRegion)region).getUserAttribute()).getExtraTableInfo().getPrimaryKeyColumnNames();
        } else {
            this.gfxdManager.getLogger().warn("Table " + this.gfxdManager.getTable() + " could not be identified.");
        }
    }

    private boolean tableHasPK() {
        return this.pkColumns != null && this.pkColumns.length > 0;
    }

    private boolean isWriteOnlyTable() {
        return this.isWriteOnly;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closeForRead() throws IOException {
        Object object = lockObject;
        synchronized (object) {
            if (this.reader != null) {
                this.reader.close();
            }
            this.decrementRefCount();
            this.accessorClosed = true;
            lockObject.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void finalize() {
        Object object = lockObject;
        synchronized (object) {
            if (!this.accessorClosed) {
                this.decrementRefCount();
                lockObject.notifyAll();
            }
        }
    }

    private void incrementRefCount() throws IOException {
        String table = this.gfxdManager.getTable();
        Object value = referenceCounts.get(table);
        if (value != null && value instanceof Long) {
            Long currentTime = System.currentTimeMillis();
            if (currentTime - (Long)value <= 30000L) {
                referenceCounts.put(table, (long)currentTime);
                throw new IOException("Encountered failure while processing request on " + table + ". Please wait for " + 30 + " seconds before firing any query on the same table.");
            }
            referenceCounts.put(table, 1);
        } else {
            Integer count = value == null ? Integer.valueOf(0) : (Integer)value;
            count = count + 1;
            referenceCounts.put(table, count);
        }
        ++lonerRefCount;
    }

    private void decrementRefCount() {
        Object value = referenceCounts.get(this.gfxdManager.getTable());
        if (value == null || value instanceof Long) {
            return;
        }
        Integer count = (Integer)value;
        count = count - 1;
        referenceCounts.put(this.gfxdManager.getTable(), count);
        if (--lonerRefCount < 0) {
            lonerRefCount = 0;
        }
    }

    private static void resetRefCount(String table) {
        referenceCounts.put(table, System.currentTimeMillis());
        int totalCount = 0;
        for (String tName : referenceCounts.keySet()) {
            Object value = referenceCounts.get(tName);
            if (value == null || value instanceof Long) continue;
            Integer cnt = (Integer)value;
            if (cnt >= 0) {
                totalCount += cnt.intValue();
                continue;
            }
            referenceCounts.put(tName, 0);
        }
        lonerRefCount = totalCount;
    }

    public OneRow readNextObject() throws IOException {
        if (this.reader.next(this.key, this.data)) {
            return new OneRow((Object)this.key, (Object)this.data);
        }
        return null;
    }

    public void resetLonerRefCount() {
        GemFireXDAccessor.resetLonerRefCount(this.gfxdManager.getTable());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void resetLonerRefCount(String table) {
        Object object = lockObject;
        synchronized (object) {
            GemFireXDAccessor.resetRefCount(table);
            lockObject.notifyAll();
        }
    }
}

