/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.sql.calcite.schema;

import com.amazonaws.annotation.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.ColumnIncluderator;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.schema.SegmentMetadataHolder;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.sql.calcite.view.DruidViewMacro;
import org.apache.druid.sql.calcite.view.ViewManager;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.ReadablePeriod;

@ManageLifecycle
public class DruidSchema
extends AbstractSchema {
    private static final Comparator<DataSegment> SEGMENT_ORDER = Comparator.comparing(segment -> segment.getInterval().getStart()).reversed().thenComparing(java.util.function.Function.identity());
    public static final String NAME = "druid";
    private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
    private static final int MAX_SEGMENTS_PER_QUERY = 15000;
    private static final long DEFAULT_IS_PUBLISHED = 0L;
    private static final long DEFAULT_IS_AVAILABLE = 1L;
    private static final long DEFAULT_NUM_ROWS = 0L;
    private final QueryLifecycleFactory queryLifecycleFactory;
    private final PlannerConfig config;
    private final ViewManager viewManager;
    private final ExecutorService cacheExec;
    private final ConcurrentMap<String, DruidTable> tables;
    private final CountDownLatch initialized = new CountDownLatch(1);
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final Map<String, TreeMap<DataSegment, SegmentMetadataHolder>> segmentMetadataInfo = new HashMap<String, TreeMap<DataSegment, SegmentMetadataHolder>>();
    private int totalSegments = 0;
    private final Set<DataSegment> mutableSegments = new TreeSet<DataSegment>(SEGMENT_ORDER);
    private final Set<String> dataSourcesNeedingRebuild = new HashSet<String>();
    private final TreeSet<DataSegment> segmentsNeedingRefresh = new TreeSet<DataSegment>(SEGMENT_ORDER);
    private final Escalator escalator;
    private boolean refreshImmediately = false;
    private long lastRefresh = 0L;
    private long lastFailure = 0L;
    private boolean isServerViewInitialized = false;

    @Inject
    public DruidSchema(QueryLifecycleFactory queryLifecycleFactory, TimelineServerView serverView, PlannerConfig config, ViewManager viewManager, Escalator escalator) {
        this.queryLifecycleFactory = (QueryLifecycleFactory)Preconditions.checkNotNull((Object)queryLifecycleFactory, (Object)"queryLifecycleFactory");
        Preconditions.checkNotNull((Object)serverView, (Object)"serverView");
        this.config = (PlannerConfig)Preconditions.checkNotNull((Object)config, (Object)"config");
        this.viewManager = (ViewManager)Preconditions.checkNotNull((Object)viewManager, (Object)"viewManager");
        this.cacheExec = ScheduledExecutors.fixed((int)1, (String)"DruidSchema-Cache-%d");
        this.tables = new ConcurrentHashMap<String, DruidTable>();
        this.escalator = escalator;
        serverView.registerTimelineCallback((Executor)Execs.directExecutor(), new TimelineServerView.TimelineCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public ServerView.CallbackAction timelineInitialized() {
                Object object = DruidSchema.this.lock;
                synchronized (object) {
                    DruidSchema.this.isServerViewInitialized = true;
                    DruidSchema.this.lock.notifyAll();
                }
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                DruidSchema.this.addSegment(server, segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentRemoved(DataSegment segment) {
                DruidSchema.this.removeSegment(segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment) {
                DruidSchema.this.removeServerSegment(server, segment);
                return ServerView.CallbackAction.CONTINUE;
            }
        });
    }

    @LifecycleStart
    public void start() throws InterruptedException {
        this.cacheExec.submit(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                block23: {
                    while (true) {
                        try {
                            Iterator iterator;
                            if (Thread.currentThread().isInterrupted()) break block23;
                            TreeSet<DataSegment> segmentsToRefresh = new TreeSet<DataSegment>();
                            TreeSet dataSourcesToRebuild = new TreeSet();
                            try {
                                Object object = DruidSchema.this.lock;
                                synchronized (object) {
                                    long nextRefreshNoFuzz = DateTimes.utc((long)DruidSchema.this.lastRefresh).plus((ReadablePeriod)DruidSchema.this.config.getMetadataRefreshPeriod()).getMillis();
                                    long nextRefresh = nextRefreshNoFuzz + (long)((double)(nextRefreshNoFuzz - DruidSchema.this.lastRefresh) * 0.1);
                                    while (true) {
                                        boolean wasRecentFailure = DateTimes.utc((long)DruidSchema.this.lastFailure).plus((ReadablePeriod)DruidSchema.this.config.getMetadataRefreshPeriod()).isAfterNow();
                                        if (DruidSchema.this.isServerViewInitialized && !wasRecentFailure && (!DruidSchema.this.segmentsNeedingRefresh.isEmpty() || !DruidSchema.this.dataSourcesNeedingRebuild.isEmpty()) && (DruidSchema.this.refreshImmediately || nextRefresh < System.currentTimeMillis())) break;
                                        if (DruidSchema.this.isServerViewInitialized) {
                                            DruidSchema.this.initialized.countDown();
                                        }
                                        DruidSchema.this.lock.wait(Math.max(1L, nextRefresh - System.currentTimeMillis()));
                                    }
                                    segmentsToRefresh.addAll(DruidSchema.this.segmentsNeedingRefresh);
                                    DruidSchema.this.segmentsNeedingRefresh.clear();
                                    DruidSchema.this.segmentsNeedingRefresh.addAll(DruidSchema.this.mutableSegments);
                                    DruidSchema.this.lastFailure = 0L;
                                    DruidSchema.this.lastRefresh = System.currentTimeMillis();
                                    DruidSchema.this.refreshImmediately = false;
                                }
                                Set<DataSegment> refreshed = DruidSchema.this.refreshSegments(segmentsToRefresh);
                                iterator = DruidSchema.this.lock;
                                synchronized (iterator) {
                                    DruidSchema.this.segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
                                    dataSourcesToRebuild.addAll(DruidSchema.this.dataSourcesNeedingRebuild);
                                    refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
                                    DruidSchema.this.dataSourcesNeedingRebuild.clear();
                                    DruidSchema.this.lock.notifyAll();
                                }
                                for (String dataSource : dataSourcesToRebuild) {
                                    DruidTable druidTable = DruidSchema.this.buildDruidTable(dataSource);
                                    DruidTable oldTable = DruidSchema.this.tables.put(dataSource, druidTable);
                                    if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
                                        log.debug("Table for dataSource[%s] has new signature[%s].", new Object[]{dataSource, druidTable.getRowSignature()});
                                        continue;
                                    }
                                    log.debug("Table for dataSource[%s] signature is unchanged.", new Object[]{dataSource});
                                }
                                DruidSchema.this.initialized.countDown();
                            }
                            catch (InterruptedException e) {
                                throw e;
                            }
                            catch (Exception e) {
                                log.warn((Throwable)e, "Metadata refresh failed, trying again soon.", new Object[0]);
                                iterator = DruidSchema.this.lock;
                                synchronized (iterator) {
                                    DruidSchema.this.segmentsNeedingRefresh.addAll(segmentsToRefresh);
                                    DruidSchema.this.dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
                                    DruidSchema.this.lastFailure = System.currentTimeMillis();
                                    DruidSchema.this.lock.notifyAll();
                                }
                            }
                        }
                        catch (InterruptedException segmentsToRefresh) {
                            break block23;
                        }
                        catch (Throwable e) {
                            log.makeAlert(e, "Metadata refresh failed permanently", new Object[0]).emit();
                            throw e;
                        }
                    }
                    finally {
                        log.info("Metadata refresh stopped.", new Object[0]);
                    }
                }
            }
        });
        if (this.config.isAwaitInitializationOnStart()) {
            long startMillis = System.currentTimeMillis();
            log.info("%s waiting for initialization.", new Object[]{((Object)((Object)this)).getClass().getSimpleName()});
            this.awaitInitialization();
            log.info("%s initialized in [%,d] ms.", new Object[]{((Object)((Object)this)).getClass().getSimpleName(), System.currentTimeMillis() - startMillis});
        }
    }

    @LifecycleStop
    public void stop() {
        this.cacheExec.shutdownNow();
    }

    public void awaitInitialization() throws InterruptedException {
        this.initialized.await();
    }

    protected Map<String, Table> getTableMap() {
        return ImmutableMap.copyOf(this.tables);
    }

    protected Multimap<String, Function> getFunctionMultimap() {
        ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
        for (Map.Entry<String, DruidViewMacro> entry : this.viewManager.getViews().entrySet()) {
            builder.put(entry);
        }
        return builder.build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void addSegment(DruidServerMetadata server, DataSegment segment) {
        Object object = this.lock;
        synchronized (object) {
            SegmentMetadataHolder holder;
            Map knownSegments = this.segmentMetadataInfo.get(segment.getDataSource());
            SegmentMetadataHolder segmentMetadataHolder = holder = knownSegments != null ? (SegmentMetadataHolder)knownSegments.get(segment) : null;
            if (holder == null) {
                long isRealtime = server.segmentReplicatable() ? 0L : 1L;
                ImmutableSet servers = ImmutableSet.of((Object)server.getName());
                holder = SegmentMetadataHolder.builder(segment.getId(), 0L, 1L, isRealtime, (Set<String>)servers, null, 0L).build();
                this.setSegmentMetadataHolder(segment, holder);
                this.segmentsNeedingRefresh.add(segment);
                if (!server.segmentReplicatable()) {
                    log.debug("Added new mutable segment[%s].", new Object[]{segment.getId()});
                    this.mutableSegments.add(segment);
                } else {
                    log.debug("Added new immutable segment[%s].", new Object[]{segment.getId()});
                }
            } else {
                Set<String> segmentServers = holder.getReplicas();
                ImmutableSet servers = new ImmutableSet.Builder().addAll(segmentServers).add((Object)server.getName()).build();
                SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder.from(holder).withReplicas((Set<String>)servers).build();
                knownSegments.put(segment, holderWithNumReplicas);
                if (server.segmentReplicatable()) {
                    this.mutableSegments.remove(segment);
                    log.debug("Segment[%s] has become immutable.", new Object[]{segment.getId()});
                }
            }
            if (!this.tables.containsKey(segment.getDataSource())) {
                this.refreshImmediately = true;
            }
            this.lock.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void removeSegment(DataSegment segment) {
        Object object = this.lock;
        synchronized (object) {
            log.debug("Segment[%s] is gone.", new Object[]{segment.getId()});
            this.dataSourcesNeedingRebuild.add(segment.getDataSource());
            this.segmentsNeedingRefresh.remove(segment);
            this.mutableSegments.remove(segment);
            Map dataSourceSegments = this.segmentMetadataInfo.get(segment.getDataSource());
            if (dataSourceSegments.remove(segment) != null) {
                --this.totalSegments;
            }
            if (dataSourceSegments.isEmpty()) {
                this.segmentMetadataInfo.remove(segment.getDataSource());
                this.tables.remove(segment.getDataSource());
                log.info("Removed all metadata for dataSource[%s].", new Object[]{segment.getDataSource()});
            }
            this.lock.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeServerSegment(DruidServerMetadata server, DataSegment segment) {
        Object object = this.lock;
        synchronized (object) {
            log.debug("Segment[%s] is gone from server[%s]", new Object[]{segment.getId(), server.getName()});
            Map knownSegments = this.segmentMetadataInfo.get(segment.getDataSource());
            SegmentMetadataHolder holder = (SegmentMetadataHolder)knownSegments.get(segment);
            Set<String> segmentServers = holder.getReplicas();
            ImmutableSet servers = FluentIterable.from(segmentServers).filter(Predicates.not((Predicate)Predicates.equalTo((Object)server.getName()))).toSet();
            SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder.from(holder).withReplicas((Set<String>)servers).build();
            knownSegments.put(segment, holderWithNumReplicas);
            this.lock.notifyAll();
        }
    }

    @VisibleForTesting
    Set<DataSegment> refreshSegments(Set<DataSegment> segments) throws IOException {
        HashSet<DataSegment> retVal = new HashSet<DataSegment>();
        TreeMap<String, TreeSet> segmentMap = new TreeMap<String, TreeSet>();
        for (DataSegment dataSegment : segments) {
            segmentMap.computeIfAbsent(dataSegment.getDataSource(), x -> new TreeSet<DataSegment>(SEGMENT_ORDER)).add(dataSegment);
        }
        for (Map.Entry entry : segmentMap.entrySet()) {
            String dataSource = (String)entry.getKey();
            retVal.addAll(this.refreshSegmentsForDataSource(dataSource, (Set)entry.getValue()));
        }
        return retVal;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<DataSegment> refreshSegmentsForDataSource(String dataSource, Set<DataSegment> segments) throws IOException {
        log.debug("Refreshing metadata for dataSource[%s].", new Object[]{dataSource});
        long startTime = System.currentTimeMillis();
        ImmutableMap segmentMap = Maps.uniqueIndex(segments, segment -> segment.getId().toString());
        HashSet<DataSegment> retVal = new HashSet<DataSegment>();
        Sequence<SegmentAnalysis> sequence = DruidSchema.runSegmentMetadataQuery(this.queryLifecycleFactory, Iterables.limit(segments, (int)15000), this.escalator.createEscalatedAuthenticationResult());
        try (Yielder yielder = Yielders.each(sequence);){
            while (!yielder.isDone()) {
                SegmentAnalysis analysis = (SegmentAnalysis)yielder.get();
                DataSegment segment2 = (DataSegment)segmentMap.get(analysis.getId());
                if (segment2 == null) {
                    log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", new Object[]{analysis.getId()});
                } else {
                    Object object = this.lock;
                    synchronized (object) {
                        RowSignature rowSignature = DruidSchema.analysisToRowSignature(analysis);
                        log.debug("Segment[%s] has signature[%s].", new Object[]{segment2.getId(), rowSignature});
                        Map dataSourceSegments = this.segmentMetadataInfo.get(segment2.getDataSource());
                        if (dataSourceSegments == null) {
                            log.warn("No segment map found with datasource[%s], skipping refresh", new Object[]{segment2.getDataSource()});
                        } else {
                            SegmentMetadataHolder holder = (SegmentMetadataHolder)dataSourceSegments.get(segment2);
                            if (holder == null) {
                                log.warn("No segment[%s] found, skipping refresh", new Object[]{segment2.getId()});
                            } else {
                                SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(holder).withRowSignature(rowSignature).withNumRows(analysis.getNumRows()).build();
                                dataSourceSegments.put(segment2, updatedHolder);
                                this.setSegmentMetadataHolder(segment2, updatedHolder);
                                retVal.add(segment2);
                            }
                        }
                    }
                }
                yielder = yielder.next(null);
            }
        }
        log.info("Refreshed metadata for dataSource[%s] in %,d ms (%d segments queried, %d segments left).", new Object[]{dataSource, System.currentTimeMillis() - startTime, retVal.size(), segments.size() - retVal.size()});
        return retVal;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void setSegmentMetadataHolder(DataSegment segment, SegmentMetadataHolder segmentMetadataHolder) {
        Object object = this.lock;
        synchronized (object) {
            TreeMap dataSourceSegments = this.segmentMetadataInfo.computeIfAbsent(segment.getDataSource(), x -> new TreeMap(SEGMENT_ORDER));
            if (dataSourceSegments.put(segment, segmentMetadataHolder) == null) {
                ++this.totalSegments;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DruidTable buildDruidTable(String dataSource) {
        Object object = this.lock;
        synchronized (object) {
            Map segmentMap = this.segmentMetadataInfo.get(dataSource);
            TreeMap<String, ValueType> columnTypes = new TreeMap<String, ValueType>();
            if (segmentMap != null) {
                for (SegmentMetadataHolder segmentMetadataHolder : segmentMap.values()) {
                    RowSignature rowSignature = segmentMetadataHolder.getRowSignature();
                    if (rowSignature == null) continue;
                    for (String column : rowSignature.getRowOrder()) {
                        columnTypes.putIfAbsent(column, rowSignature.getColumnType(column));
                    }
                }
            }
            RowSignature.Builder builder = RowSignature.builder();
            columnTypes.forEach(builder::add);
            return new DruidTable((DataSource)new TableDataSource(dataSource), builder.build());
        }
    }

    private static Sequence<SegmentAnalysis> runSegmentMetadataQuery(QueryLifecycleFactory queryLifecycleFactory, Iterable<DataSegment> segments, AuthenticationResult authenticationResult) {
        String dataSource = (String)Iterables.getOnlyElement((Iterable)StreamSupport.stream(segments.spliterator(), false).map(DataSegment::getDataSource).collect(Collectors.toSet()));
        MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec(StreamSupport.stream(segments.spliterator(), false).map(DataSegment::toDescriptor).collect(Collectors.toList()));
        SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery((DataSource)new TableDataSource(dataSource), (QuerySegmentSpec)querySegmentSpec, (ColumnIncluderator)new AllColumnIncluderator(), Boolean.valueOf(false), (Map)ImmutableMap.of(), EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), Boolean.valueOf(false), Boolean.valueOf(false));
        return queryLifecycleFactory.factorize().runSimple((Query)segmentMetadataQuery, authenticationResult, null);
    }

    private static RowSignature analysisToRowSignature(SegmentAnalysis analysis) {
        RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
        for (Map.Entry entry : analysis.getColumns().entrySet()) {
            ValueType valueType;
            if (((ColumnAnalysis)entry.getValue()).isError()) continue;
            try {
                valueType = ValueType.valueOf((String)StringUtils.toUpperCase((String)((ColumnAnalysis)entry.getValue()).getType()));
            }
            catch (IllegalArgumentException e) {
                valueType = ValueType.COMPLEX;
            }
            rowSignatureBuilder.add((String)entry.getKey(), valueType);
        }
        return rowSignatureBuilder.build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Map<DataSegment, SegmentMetadataHolder> getSegmentMetadata() {
        HashMap<DataSegment, SegmentMetadataHolder> segmentMetadata = new HashMap<DataSegment, SegmentMetadataHolder>();
        Object object = this.lock;
        synchronized (object) {
            for (TreeMap<DataSegment, SegmentMetadataHolder> val : this.segmentMetadataInfo.values()) {
                segmentMetadata.putAll(val);
            }
        }
        return segmentMetadata;
    }

    int getTotalSegments() {
        return this.totalSegments;
    }
}

