/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Ranges;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.RangeUtil;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.measure.hllc.HLLCMeasureType;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.cache.TsConditionExtractor;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionary;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointTupleConverter;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EndpointTupleIterator
implements ITupleIterator {
    private static final Logger logger = LoggerFactory.getLogger(EndpointTupleIterator.class);
    private final IISegment seg;
    private final String factTableName;
    private final List<TblColRef> columns;
    private final TupleInfo tupleInfo;
    private final TableRecordInfo tableRecordInfo;
    private final EndpointTupleConverter tupleConverter;
    private final CoprocessorRowType pushedDownRowType;
    private final CoprocessorFilter pushedDownFilter;
    private final CoprocessorProjector pushedDownProjector;
    private final EndpointAggregators pushedDownAggregators;
    private final Range<Long> tsRange;
    private Iterator<List<IIProtos.IIResponseInternal.IIRow>> regionResponsesIterator = null;
    private ITupleIterator tupleIterator = null;
    private HTableInterface table = null;
    private TblColRef partitionCol;
    private long lastDataTime = -1L;
    private int rowsInAllMetric = 0;

    public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn, TupleInfo returnTupleInfo) throws Throwable {
        String tableName = segment.getStorageLocationIdentifier();
        this.table = conn.getTable(tableName);
        this.factTableName = segment.getIIDesc().getFactTableName();
        if (rootFilter == null) {
            rootFilter = ConstantTupleFilter.TRUE;
        }
        if (groupBy == null) {
            groupBy = Sets.newHashSet();
        }
        if (measures == null) {
            measures = Lists.newArrayList();
        }
        this.rewriteMeasureParameters(measures, segment.getColumns());
        this.seg = segment;
        this.columns = segment.getColumns();
        this.tupleInfo = returnTupleInfo;
        this.tupleConverter = new EndpointTupleConverter(this.columns, measures, returnTupleInfo);
        this.tableRecordInfo = new TableRecordInfo(this.seg);
        this.pushedDownRowType = CoprocessorRowType.fromTableRecordInfo(this.tableRecordInfo, this.columns);
        this.pushedDownFilter = CoprocessorFilter.fromFilter(new ClearTextDictionary(this.tableRecordInfo), rootFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
        for (TblColRef column : this.pushedDownFilter.getInevaluableColumns()) {
            groupBy.add(column);
        }
        this.pushedDownProjector = CoprocessorProjector.makeForEndpoint(this.tableRecordInfo, groupBy);
        this.pushedDownAggregators = EndpointAggregators.fromFunctions(this.tableRecordInfo, measures);
        int tsCol = this.tableRecordInfo.getTimestampColumn();
        this.partitionCol = this.columns.get(tsCol);
        this.tsRange = TsConditionExtractor.extractTsCondition(this.partitionCol, rootFilter);
        if (this.tsRange == null) {
            logger.info("TsRange conflict for endpoint, return empty directly");
            this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR;
        } else {
            logger.info("The tsRange being pushed is " + RangeUtil.formatTsRange(this.tsRange));
        }
        IIProtos.IIRequest endpointRequest = this.prepareRequest();
        Collection<IIProtos.IIResponse> compressedShardResults = this.getResults(endpointRequest, this.table);
        ArrayList<IIProtos.IIResponseInternal> shardResults = new ArrayList<IIProtos.IIResponseInternal>();
        for (IIProtos.IIResponse input : compressedShardResults) {
            byte[] compressed = HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)input.getBlob());
            try {
                byte[] decompressed = CompressionUtils.decompress(compressed);
                shardResults.add(IIProtos.IIResponseInternal.parseFrom(decompressed));
            }
            catch (Exception e) {
                throw new RuntimeException("decompress endpoint response error");
            }
        }
        this.lastDataTime = (Long)Collections.min(Collections2.transform(shardResults, (Function)new Function<IIProtos.IIResponseInternal, Long>(){

            @Nullable
            public Long apply(IIProtos.IIResponseInternal input) {
                IIProtos.IIResponseInternal.Stats status = input.getStats();
                logger.info("Endpoints all returned, stats from shard {}: start moment:{}, finish moment: {}, elapsed ms: {}, scanned slices: {}, latest slice time is {}", new Object[]{String.valueOf(status.getMyShard()), DateFormat.formatToTimeStr(status.getServiceStartTime()), DateFormat.formatToTimeStr(status.getServiceEndTime()), String.valueOf(status.getServiceEndTime() - status.getServiceStartTime()), String.valueOf(status.getScannedSlices()), DateFormat.formatToTimeStr(status.getLatestDataTime())});
                return status.getLatestDataTime();
            }
        }));
        this.regionResponsesIterator = Collections2.transform(shardResults, (Function)new Function<IIProtos.IIResponseInternal, List<IIProtos.IIResponseInternal.IIRow>>(){

            @Nullable
            public List<IIProtos.IIResponseInternal.IIRow> apply(@Nullable IIProtos.IIResponseInternal input) {
                return input.getRowsList();
            }
        }).iterator();
        this.tupleIterator = this.regionResponsesIterator.hasNext() ? new SingleRegionTupleIterator(this.regionResponsesIterator.next()) : ITupleIterator.EMPTY_TUPLE_ITERATOR;
    }

    private void rewriteMeasureParameters(List<FunctionDesc> measures, List<TblColRef> columns) {
        for (FunctionDesc functionDesc : measures) {
            if (functionDesc.isCount()) {
                functionDesc.setReturnType("bigint");
                continue;
            }
            boolean updated = false;
            for (TblColRef column : columns) {
                if (!column.isSameAs(this.factTableName, functionDesc.getParameter().getValue())) continue;
                if (HLLCMeasureType.isCountDistinct(functionDesc)) {
                    String iiDefaultHLLC = "hllc10";
                    functionDesc.setReturnType(iiDefaultHLLC);
                } else {
                    functionDesc.setReturnType(column.getColumnDesc().getType().toString());
                }
                functionDesc.getParameter().setColRefs((List<TblColRef>)ImmutableList.of((Object)column));
                updated = true;
                break;
            }
            if (updated) continue;
            throw new RuntimeException("Func " + functionDesc + " is not related to any column in fact table " + this.factTableName);
        }
    }

    @Override
    public boolean hasNext() {
        while (!this.tupleIterator.hasNext()) {
            if (this.regionResponsesIterator.hasNext()) {
                this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next());
                continue;
            }
            return false;
        }
        return true;
    }

    @Override
    public ITuple next() {
        ++this.rowsInAllMetric;
        if (!this.hasNext()) {
            throw new IllegalStateException("No more ITuple in EndpointTupleIterator");
        }
        ITuple tuple = (ITuple)this.tupleIterator.next();
        return tuple;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void close() {
        IOUtils.closeQuietly((Closeable)this.table);
        logger.info("Closed after " + this.rowsInAllMetric + " rows are fetched");
    }

    public Range<Long> getCacheExcludedPeriod() {
        Preconditions.checkArgument((this.lastDataTime != -1L ? 1 : 0) != 0, (Object)"lastDataTime is not set yet");
        return Ranges.greaterThan((Comparable)Long.valueOf(this.lastDataTime));
    }

    private IIProtos.IIRequest prepareRequest() throws IOException {
        IIProtos.IIRequest.Builder builder = IIProtos.IIRequest.newBuilder();
        if (this.tsRange != null) {
            byte[] tsRangeBytes = SerializationUtils.serialize(this.tsRange);
            builder.setTsRange(HBaseZeroCopyByteString.wrap((byte[])tsRangeBytes));
        }
        builder.setType(HBaseZeroCopyByteString.wrap((byte[])CoprocessorRowType.serialize(this.pushedDownRowType))).setFilter(HBaseZeroCopyByteString.wrap((byte[])CoprocessorFilter.serialize(this.pushedDownFilter))).setProjector(HBaseZeroCopyByteString.wrap((byte[])CoprocessorProjector.serialize(this.pushedDownProjector))).setAggregator(HBaseZeroCopyByteString.wrap((byte[])EndpointAggregators.serialize(this.pushedDownAggregators)));
        IIProtos.IIRequest request = builder.build();
        return request;
    }

    private Collection<IIProtos.IIResponse> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
        Map results = table.coprocessorService(IIProtos.RowsService.class, null, null, (Batch.Call)new Batch.Call<IIProtos.RowsService, IIProtos.IIResponse>(){

            public IIProtos.IIResponse call(IIProtos.RowsService rowsService) throws IOException {
                ServerRpcController controller = new ServerRpcController();
                BlockingRpcCallback rpcCallback = new BlockingRpcCallback();
                rowsService.getRows((RpcController)controller, request, (RpcCallback<IIProtos.IIResponse>)rpcCallback);
                IIProtos.IIResponse response = (IIProtos.IIResponse)rpcCallback.get();
                if (controller.failedOnException()) {
                    throw controller.getFailedOn();
                }
                return response;
            }
        });
        return results.values();
    }

    class SingleRegionTupleIterator
    implements ITupleIterator {
        private List<IIProtos.IIResponseInternal.IIRow> rows;
        private int index = 0;
        private TableRecord tableRecord;
        private List<Object> measureValues;
        private Tuple tuple;

        public SingleRegionTupleIterator(List<IIProtos.IIResponseInternal.IIRow> rows) {
            this.rows = rows;
            this.index = 0;
            this.tableRecord = EndpointTupleIterator.this.tableRecordInfo.createTableRecord();
            this.tuple = new Tuple(EndpointTupleIterator.this.tupleInfo);
        }

        @Override
        public boolean hasNext() {
            return this.index < this.rows.size();
        }

        @Override
        public ITuple next() {
            if (!this.hasNext()) {
                throw new IllegalStateException("No more Tuple in the SingleRegionTupleIterator");
            }
            IIProtos.IIResponseInternal.IIRow currentRow = this.rows.get(this.index);
            byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes((ByteString)currentRow.getColumns());
            this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
            if (currentRow.hasMeasures()) {
                ByteBuffer buffer = currentRow.getMeasures().asReadOnlyByteBuffer();
                this.measureValues = EndpointTupleIterator.this.pushedDownAggregators.deserializeMetricValues(buffer);
            }
            ++this.index;
            return EndpointTupleIterator.this.tupleConverter.makeTuple(this.tableRecord, this.measureValues, this.tuple);
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override
        public void close() {
        }
    }
}

