package org.apache.pig.backend.hadoop.hbase;

import com.google.common.collect.Lists;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadPushDown;
import org.apache.pig.LoadStoreCaster;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Utils;
import org.codehaus.jackson.util.MinimalPrettyPrinter;

/* loaded from: input_file:org/apache/pig/backend/hadoop/hbase/HBaseStorage.class */
public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown {
    private static final String STRING_CASTER = "UTF8StorageConverter";
    private static final String BYTE_CASTER = "HBaseBinaryConverter";
    private static final String CASTER_PROPERTY = "pig.hbase.caster";
    private List<byte[]> columnList_;
    private HTable m_table;
    private HBaseConfiguration m_conf;
    private RecordReader reader;
    private RecordWriter writer;
    private Scan scan;
    private final CommandLine configuredOptions_;
    private boolean loadRowKey_;
    private final long limit_;
    private final int caching_;
    protected transient byte[] gt_;
    protected transient byte[] gte_;
    protected transient byte[] lt_;
    protected transient byte[] lte_;
    private LoadCaster caster_;
    private ResourceSchema schema_;
    private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
    private static final Options validOptions_ = new Options();
    private static final CommandLineParser parser_ = new GnuParser();

    private static void populateValidOptions() {
        validOptions_.addOption("loadKey", false, "Load Key");
        validOptions_.addOption("gt", true, "Records must be greater than this value (binary, double-slash-escaped)");
        validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
        validOptions_.addOption("gte", true, "Records must be greater than or equal to this value");
        validOptions_.addOption("lte", true, "Records must be less than or equal to this value");
        validOptions_.addOption("caching", true, "Number of rows scanners should cache");
        validOptions_.addOption("limit", true, "Per-region limit");
        validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
    }

    public HBaseStorage(String str) throws ParseException, IOException {
        this(str, "");
    }

    public HBaseStorage(String str, String str2) throws ParseException, IOException {
        this.columnList_ = Lists.newArrayList();
        populateValidOptions();
        String[] split = str.split(MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR);
        try {
            this.configuredOptions_ = parser_.parse(validOptions_, str2.split(MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR));
            this.loadRowKey_ = this.configuredOptions_.hasOption("loadKey");
            for (String str3 : split) {
                this.columnList_.add(Bytes.toBytes(str3));
            }
            this.m_conf = new HBaseConfiguration();
            String optionValue = this.configuredOptions_.getOptionValue("caster", this.m_conf.get(CASTER_PROPERTY, STRING_CASTER));
            if (STRING_CASTER.equalsIgnoreCase(optionValue)) {
                this.caster_ = new Utf8StorageConverter();
            } else if (BYTE_CASTER.equalsIgnoreCase(optionValue)) {
                this.caster_ = new HBaseBinaryConverter();
            } else {
                try {
                    this.caster_ = (LoadCaster) Class.forName(optionValue).newInstance();
                } catch (ClassCastException e) {
                    LOG.error("Congifured caster does not implement LoadCaster interface.");
                    throw new IOException(e);
                } catch (ClassNotFoundException e2) {
                    LOG.error("Configured caster class not found.", e2);
                    throw new IOException(e2);
                } catch (IllegalAccessException e3) {
                    LOG.error("Illegal Access Exception for configured caster " + optionValue, e3);
                    throw new IOException(e3);
                } catch (InstantiationException e4) {
                    LOG.error("Unable to instantiate configured caster " + optionValue, e4);
                    throw new IOException(e4);
                }
            }
            this.caching_ = Integer.valueOf(this.configuredOptions_.getOptionValue("caching", "100")).intValue();
            this.limit_ = Long.valueOf(this.configuredOptions_.getOptionValue("limit", "-1")).longValue();
            initScan();
        } catch (ParseException e5) {
            new HelpFormatter().printHelp("[-loadKey] [-gt] [-gte] [-lt] [-lte] [-caching] [-caster]", validOptions_);
            throw e5;
        }
    }

    private void initScan() {
        this.scan = new Scan();
        if (this.configuredOptions_.hasOption("gt")) {
            this.gt_ = Bytes.toBytesBinary(Utils.slashisize(this.configuredOptions_.getOptionValue("gt")));
            addFilter(CompareFilter.CompareOp.GREATER, this.gt_);
        }
        if (this.configuredOptions_.hasOption("lt")) {
            this.lt_ = Bytes.toBytesBinary(Utils.slashisize(this.configuredOptions_.getOptionValue("lt")));
            addFilter(CompareFilter.CompareOp.LESS, this.lt_);
        }
        if (this.configuredOptions_.hasOption("gte")) {
            this.gte_ = Bytes.toBytesBinary(Utils.slashisize(this.configuredOptions_.getOptionValue("gte")));
            addFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, this.gte_);
        }
        if (this.configuredOptions_.hasOption("lte")) {
            this.lte_ = Bytes.toBytesBinary(Utils.slashisize(this.configuredOptions_.getOptionValue("lte")));
            addFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, this.lte_);
        }
    }

    private void addFilter(CompareFilter.CompareOp compareOp, byte[] bArr) {
        LOG.info("Adding filter " + compareOp.toString() + " with value " + Bytes.toStringBinary(bArr));
        FilterList filter = this.scan.getFilter();
        if (filter == null) {
            filter = new FilterList();
        }
        filter.addFilter(new RowFilter(compareOp, new BinaryComparator(bArr)));
        this.scan.setFilter(filter);
    }

    @Override // org.apache.pig.LoadFunc
    public Tuple getNext() throws IOException {
        try {
            if (!this.reader.nextKeyValue()) {
                return null;
            }
            ImmutableBytesWritable immutableBytesWritable = (ImmutableBytesWritable) this.reader.getCurrentKey();
            Result result = (Result) this.reader.getCurrentValue();
            int size = this.columnList_.size();
            if (this.loadRowKey_) {
                size++;
            }
            Tuple newTuple = TupleFactory.getInstance().newTuple(size);
            int i = 0;
            if (this.loadRowKey_) {
                newTuple.set(0, new DataByteArray(immutableBytesWritable.get()));
                i = 0 + 1;
            }
            for (int i2 = 0; i2 < this.columnList_.size(); i2++) {
                byte[] value = result.getValue(this.columnList_.get(i2));
                if (value != null) {
                    newTuple.set(i2 + i, new DataByteArray(value));
                } else {
                    newTuple.set(i2 + i, null);
                }
            }
            return newTuple;
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    @Override // org.apache.pig.LoadFunc
    public InputFormat getInputFormat() {
        return new HBaseTableInputFormat.HBaseTableIFBuilder().withLimit(this.limit_).withGt(this.gt_).withGte(this.gte_).withLt(this.lt_).withLte(this.lte_).withConf(this.m_conf).build();
    }

    @Override // org.apache.pig.LoadFunc
    public void prepareToRead(RecordReader recordReader, PigSplit pigSplit) {
        this.reader = recordReader;
    }

    @Override // org.apache.pig.LoadFunc
    public void setLocation(String str, Job job) throws IOException {
        String str2 = str;
        if (str.startsWith("hbase://")) {
            str2 = str.substring(8);
        }
        if (this.m_table == null) {
            this.m_table = new HTable(this.m_conf, str2);
        }
        this.m_table.setScannerCaching(this.caching_);
        this.m_conf.set("hbase.mapreduce.inputtable", str2);
        this.scan.addColumns((byte[][]) this.columnList_.toArray((Object[]) new byte[0]));
        this.m_conf.set("hbase.mapreduce.scan", convertScanToString(this.scan));
    }

    @Override // org.apache.pig.LoadFunc
    public String relativeToAbsolutePath(String str, Path path) throws IOException {
        return str;
    }

    private static String convertScanToString(Scan scan) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            scan.write(new DataOutputStream(byteArrayOutputStream));
            return Base64.encodeBytes(byteArrayOutputStream.toByteArray());
        } catch (IOException e) {
            LOG.error(e);
            return "";
        }
    }

    @Override // org.apache.pig.LoadFunc
    public LoadCaster getLoadCaster() throws IOException {
        return this.caster_;
    }

    @Override // org.apache.pig.StoreFuncInterface
    public OutputFormat getOutputFormat() throws IOException {
        return new TableOutputFormat();
    }

    @Override // org.apache.pig.StoreFuncInterface
    public void checkSchema(ResourceSchema resourceSchema) throws IOException {
        if (this.caster_ instanceof LoadStoreCaster) {
            this.schema_ = resourceSchema;
        } else {
            LOG.error("Caster must implement LoadStoreCaster for writing to HBase.");
            throw new IOException("Bad Caster " + this.caster_.getClass());
        }
    }

    @Override // org.apache.pig.StoreFuncInterface
    public void prepareToWrite(RecordWriter recordWriter) throws IOException {
        this.writer = recordWriter;
    }

    @Override // org.apache.pig.StoreFuncInterface
    public void putNext(Tuple tuple) throws IOException {
        ResourceSchema.ResourceFieldSchema[] fields = this.schema_ == null ? null : this.schema_.getFields();
        Put put = new Put(objToBytes(tuple.get(0), fields == null ? DataType.findType(tuple.get(0)) : fields[0].getType()));
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 1; i < tuple.size(); i++) {
            put.add(this.columnList_.get(i - 1), currentTimeMillis, objToBytes(tuple.get(i), fields == null ? DataType.findType(tuple.get(i)) : fields[i].getType()));
        }
        try {
            this.writer.write(null, put);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    private byte[] objToBytes(Object obj, byte b) throws IOException {
        LoadStoreCaster loadStoreCaster = (LoadStoreCaster) this.caster_;
        switch (b) {
            case -1:
                throw new IOException("Unable to determine type of " + obj.getClass());
            case 1:
                return null;
            case 10:
                return loadStoreCaster.toBytes((Integer) obj);
            case 15:
                return loadStoreCaster.toBytes((Long) obj);
            case 20:
                return loadStoreCaster.toBytes((Float) obj);
            case 25:
                return loadStoreCaster.toBytes((Double) obj);
            case 50:
                return ((DataByteArray) obj).get();
            case 55:
                return loadStoreCaster.toBytes((String) obj);
            case 100:
                return loadStoreCaster.toBytes((Map<String, Object>) obj);
            case 110:
                return loadStoreCaster.toBytes((Tuple) obj);
            case 120:
                return loadStoreCaster.toBytes((DataBag) obj);
            default:
                throw new IOException("Unable to find a converter for tuple field " + obj);
        }
    }

    @Override // org.apache.pig.StoreFuncInterface
    public String relToAbsPathForStoreLocation(String str, Path path) throws IOException {
        return str;
    }

    @Override // org.apache.pig.StoreFuncInterface
    public void setStoreFuncUDFContextSignature(String str) {
    }

    @Override // org.apache.pig.StoreFuncInterface
    public void setStoreLocation(String str, Job job) throws IOException {
        if (str.startsWith("hbase://")) {
            job.getConfiguration().set("hbase.mapred.outputtable", str.substring(8));
        } else {
            job.getConfiguration().set("hbase.mapred.outputtable", str);
        }
    }

    @Override // org.apache.pig.StoreFuncInterface
    public void cleanupOnFailure(String str, Job job) throws IOException {
    }

    @Override // org.apache.pig.LoadPushDown
    public List<LoadPushDown.OperatorSet> getFeatures() {
        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
    }

    @Override // org.apache.pig.LoadPushDown
    public LoadPushDown.RequiredFieldResponse pushProjection(LoadPushDown.RequiredFieldList requiredFieldList) throws FrontendException {
        List<LoadPushDown.RequiredField> fields = requiredFieldList.getFields();
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(fields.size());
        int i = this.loadRowKey_ ? 1 : 0;
        if (this.loadRowKey_) {
            if (fields.size() < 1 || fields.get(0).getIndex() != 0) {
                this.loadRowKey_ = false;
            } else {
                fields.remove(0);
            }
        }
        Iterator<LoadPushDown.RequiredField> it = fields.iterator();
        while (it.hasNext()) {
            newArrayListWithExpectedSize.add(this.columnList_.get(it.next().getIndex() - i));
        }
        LOG.info("pushProjection After Projection: loadRowKey is " + this.loadRowKey_);
        Iterator it2 = newArrayListWithExpectedSize.iterator();
        while (it2.hasNext()) {
            LOG.info("pushProjection -- col: " + Bytes.toStringBinary((byte[]) it2.next()));
        }
        this.columnList_ = newArrayListWithExpectedSize;
        return new LoadPushDown.RequiredFieldResponse(true);
    }
}
