/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractFixedIntervalTask;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;

public abstract class MergeTaskBase
extends AbstractFixedIntervalTask {
    private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);
    @JsonIgnore
    private final List<DataSegment> segments;
    @JsonIgnore
    @Nullable
    private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;

    protected MergeTaskBase(String id, final String dataSource, List<DataSegment> segments, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, Map<String, Object> context) {
        super(id != null ? id : StringUtils.format((String)"merge_%s_%s", (Object[])new Object[]{MergeTaskBase.computeProcessingID(dataSource, segments), DateTimes.nowUtc().toString()}), dataSource, MergeTaskBase.computeMergedInterval(segments), context);
        Preconditions.checkArgument((segments.size() > 0 ? 1 : 0) != 0, (Object)"segments nonempty");
        Preconditions.checkArgument((Iterables.size((Iterable)Iterables.filter(segments, (Predicate)new Predicate<DataSegment>(){

            public boolean apply(@Nullable DataSegment segment) {
                return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource);
            }
        })) == 0 ? 1 : 0) != 0, (Object)"segments in the wrong datasource");
        this.verifyInputSegments(segments);
        this.segments = segments;
        this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
    }

    protected void verifyInputSegments(List<DataSegment> segments) {
        Preconditions.checkArgument((Iterables.size((Iterable)Iterables.filter(segments, (Predicate)new Predicate<DataSegment>(){

            public boolean apply(@Nullable DataSegment segment) {
                return segment == null || !(segment.getShardSpec() instanceof NoneShardSpec);
            }
        })) == 0 ? 1 : 0) != 0, (Object)"segments without NoneShardSpec");
    }

    @Override
    public int getPriority() {
        return this.getContextValue("priority", 25);
    }

    @Override
    public TaskStatus run(TaskToolbox toolbox) throws Exception {
        TaskLock myLock = (TaskLock)Iterables.getOnlyElement(MergeTaskBase.getTaskLocks(toolbox.getTaskActionClient()));
        ServiceEmitter emitter = toolbox.getEmitter();
        ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
        DataSegment mergedSegment = MergeTaskBase.computeMergedSegment(this.getDataSource(), myLock.getVersion(), this.segments);
        File mergeDir = toolbox.getMergeDir();
        try {
            long startTime = System.currentTimeMillis();
            log.info("Starting merge of id[%s], segments: %s", new Object[]{this.getId(), Lists.transform(this.segments, DataSegment::getId)});
            Map<DataSegment, File> gettedSegments = toolbox.fetchSegments(this.segments);
            File fileToUpload = this.merge(toolbox, gettedSegments, mergeDir);
            emitter.emit(builder.build("merger/numMerged", (Number)this.segments.size()));
            emitter.emit(builder.build("merger/mergeTime", (Number)(System.currentTimeMillis() - startTime)));
            log.info("[%s] : Merged %d segments in %,d millis", new Object[]{mergedSegment.getDataSource(), this.segments.size(), System.currentTimeMillis() - startTime});
            long uploadStart = System.currentTimeMillis();
            DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment, false);
            emitter.emit(builder.build("merger/uploadTime", (Number)(System.currentTimeMillis() - uploadStart)));
            emitter.emit(builder.build("merger/mergeSize", (Number)uploadedSegment.getSize()));
            toolbox.publishSegments((Iterable<DataSegment>)ImmutableList.of((Object)uploadedSegment));
            return TaskStatus.success((String)this.getId());
        }
        catch (Exception e) {
            log.makeAlert((Throwable)e, "Exception merging[%s]", new Object[]{mergedSegment.getDataSource()}).addData("interval", (Object)mergedSegment.getInterval()).emit();
            return TaskStatus.failure((String)this.getId());
        }
    }

    @Override
    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        Set requested;
        if (!super.isReady(taskActionClient)) {
            return false;
        }
        Set current = taskActionClient.submit(new SegmentListUsedAction(this.getDataSource(), this.getInterval(), null)).stream().map(DataSegment::getId).collect(Collectors.toSet());
        Sets.SetView missingFromRequested = Sets.difference(current, requested = this.segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
        if (!missingFromRequested.isEmpty()) {
            throw new ISE("Merge is invalid: current segment(s) are not in the requested set: %s", new Object[]{Joiner.on((String)", ").join((Iterable)missingFromRequested)});
        }
        Sets.SetView missingFromCurrent = Sets.difference(requested, current);
        if (!missingFromCurrent.isEmpty()) {
            throw new ISE("Merge is invalid: requested segment(s) are not in the current set: %s", new Object[]{Joiner.on((String)", ").join((Iterable)missingFromCurrent)});
        }
        return true;
    }

    protected abstract File merge(TaskToolbox var1, Map<DataSegment, File> var2, File var3) throws Exception;

    @JsonProperty
    public List<DataSegment> getSegments() {
        return this.segments;
    }

    @JsonProperty
    @Nullable
    public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() {
        return this.segmentWriteOutMediumFactory;
    }

    @Override
    public String toString() {
        return Objects.toStringHelper((Object)this).add("id", (Object)this.getId()).add("dataSource", (Object)this.getDataSource()).add("interval", (Object)this.getInterval()).add("segments", this.segments).add("segmentWriteOutMediumFactory", (Object)this.segmentWriteOutMediumFactory).toString();
    }

    private static String computeProcessingID(String dataSource, List<DataSegment> segments) {
        String segmentIDs = Joiner.on((String)"_").join(Iterables.transform((Iterable)Ordering.natural().sortedCopy(segments), (Function)new Function<DataSegment, String>(){

            public String apply(DataSegment x) {
                return StringUtils.format((String)"%s_%s_%s_%s", (Object[])new Object[]{x.getInterval().getStart(), x.getInterval().getEnd(), x.getVersion(), x.getShardSpec().getPartitionNum()});
            }
        }));
        return StringUtils.format((String)"%s_%s", (Object[])new Object[]{dataSource, Hashing.sha1().hashString((CharSequence)segmentIDs, StandardCharsets.UTF_8).toString()});
    }

    private static Interval computeMergedInterval(List<DataSegment> segments) {
        Preconditions.checkArgument((segments.size() > 0 ? 1 : 0) != 0, (Object)"segments.size() > 0");
        DateTime start = null;
        DateTime end = null;
        for (DataSegment segment : segments) {
            if (start == null || segment.getInterval().getStart().isBefore((ReadableInstant)start)) {
                start = segment.getInterval().getStart();
            }
            if (end != null && !segment.getInterval().getEnd().isAfter((ReadableInstant)end)) continue;
            end = segment.getInterval().getEnd();
        }
        return new Interval(start, end);
    }

    private static DataSegment computeMergedSegment(String dataSource, String version, List<DataSegment> segments) {
        Interval mergedInterval = MergeTaskBase.computeMergedInterval(segments);
        TreeSet mergedDimensions = new TreeSet(String.CASE_INSENSITIVE_ORDER);
        TreeSet mergedMetrics = new TreeSet(String.CASE_INSENSITIVE_ORDER);
        for (DataSegment segment : segments) {
            mergedDimensions.addAll(segment.getDimensions());
            mergedMetrics.addAll(segment.getMetrics());
        }
        return DataSegment.builder().dataSource(dataSource).interval(mergedInterval).version(version).binaryVersion(Integer.valueOf(9)).shardSpec((ShardSpec)NoneShardSpec.instance()).dimensions((List)Lists.newArrayList(mergedDimensions)).metrics((List)Lists.newArrayList(mergedMetrics)).build();
    }
}

