/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.mapreduce;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FluoEntryInputFormat
extends InputFormat<RowColumn, Bytes> {
    private static String TIMESTAMP_CONF_KEY = FluoEntryInputFormat.class.getName() + ".timestamp";
    private static String PROPS_CONF_KEY = FluoEntryInputFormat.class.getName() + ".props";
    private static String FAMS_CONF_KEY = FluoEntryInputFormat.class.getName() + ".families";

    public RecordReader<RowColumn, Bytes> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new RecordReader<RowColumn, Bytes>(){
            private RowColumnValue rowColVal;
            private Environment env = null;
            private TransactionImpl ti = null;
            private Iterator<RowColumnValue> cellIterator;

            public void close() throws IOException {
                if (this.ti != null) {
                    this.ti.close();
                }
                if (this.env != null) {
                    this.env.close();
                }
            }

            public RowColumn getCurrentKey() throws IOException, InterruptedException {
                return this.rowColVal.getRowColumn();
            }

            public Bytes getCurrentValue() throws IOException, InterruptedException {
                return this.rowColVal.getValue();
            }

            public float getProgress() throws IOException, InterruptedException {
                return 0.0f;
            }

            public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
                try {
                    ByteArrayInputStream bais = new ByteArrayInputStream(context.getConfiguration().get(PROPS_CONF_KEY).getBytes(StandardCharsets.UTF_8));
                    this.env = new Environment(new FluoConfiguration((InputStream)bais));
                    this.ti = new TransactionImpl(this.env, context.getConfiguration().getLong(TIMESTAMP_CONF_KEY, -1L));
                    RangeInputSplit ris = (RangeInputSplit)split;
                    Span span = SpanUtil.toSpan((Range)ris.getRange());
                    HashSet<Column> columns = new HashSet<Column>();
                    for (String fam : context.getConfiguration().getStrings(FAMS_CONF_KEY, new String[0])) {
                        columns.add(new Column((CharSequence)fam));
                    }
                    this.cellIterator = this.ti.scanner().over(span).fetch(columns).build().iterator();
                }
                catch (Exception e) {
                    throw new IOException(e);
                }
            }

            public boolean nextKeyValue() throws IOException, InterruptedException {
                if (this.cellIterator.hasNext()) {
                    this.rowColVal = this.cellIterator.next();
                    return true;
                }
                this.rowColVal = null;
                return false;
            }
        };
    }

    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
        return new AccumuloInputFormat().getSplits(context);
    }

    public static void configure(Job conf, SimpleConfiguration config) {
        try {
            FluoConfiguration fconfig = new FluoConfiguration(config);
            try (Environment env = new Environment(fconfig);){
                long ts = env.getSharedResources().getTimestampTracker().allocateTimestamp().getTxTimestamp();
                conf.getConfiguration().setLong(TIMESTAMP_CONF_KEY, ts);
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                config.save((OutputStream)baos);
                conf.getConfiguration().set(PROPS_CONF_KEY, new String(baos.toByteArray(), StandardCharsets.UTF_8));
                AccumuloInputFormat.setZooKeeperInstance((Job)conf, (String)fconfig.getAccumuloInstance(), (String)fconfig.getAccumuloZookeepers());
                AccumuloInputFormat.setConnectorInfo((Job)conf, (String)fconfig.getAccumuloUser(), (AuthenticationToken)new PasswordToken((CharSequence)fconfig.getAccumuloPassword()));
                AccumuloInputFormat.setInputTableName((Job)conf, (String)env.getTable());
                AccumuloInputFormat.setScanAuthorizations((Job)conf, (Authorizations)env.getAuthorizations());
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void fetchFamilies(Job job, String ... fams) {
        job.getConfiguration().setStrings(FAMS_CONF_KEY, fams);
    }

    public static void fetchFamilies(Job job, Bytes ... fams) {
        String[] sfams = new String[fams.length];
        for (int i = 0; i < sfams.length; ++i) {
            sfams[i] = fams[i].toString();
        }
        FluoEntryInputFormat.fetchFamilies(job, sfams);
    }
}

