package org.apache.druid.segment.realtime.plumber;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.common.guava.ThreadRenamingCallable;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.DateTime;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/druid/segment/realtime/plumber/FlushingPlumber.class */
public class FlushingPlumber extends RealtimePlumber {
    private static final EmittingLogger log = new EmittingLogger(FlushingPlumber.class);
    private final DataSchema schema;
    private final RealtimeTuningConfig config;
    private final Duration flushDuration;
    private volatile ScheduledExecutorService flushScheduledExec;
    private volatile boolean stopped;

    public FlushingPlumber(Duration duration, DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, FireDepartmentMetrics fireDepartmentMetrics, ServiceEmitter serviceEmitter, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, DataSegmentAnnouncer dataSegmentAnnouncer, QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, IndexMerger indexMerger, IndexIO indexIO, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats, ObjectMapper objectMapper) {
        super(dataSchema, realtimeTuningConfig, fireDepartmentMetrics, serviceEmitter, queryRunnerFactoryConglomerate, dataSegmentAnnouncer, queryProcessingPool, joinableFactory, null, null, null, indexMerger, indexIO, cache, cacheConfig, cachePopulatorStats, objectMapper);
        this.flushScheduledExec = null;
        this.stopped = false;
        this.flushDuration = duration;
        this.schema = dataSchema;
        this.config = realtimeTuningConfig;
    }

    @Override // org.apache.druid.segment.realtime.plumber.RealtimePlumber, org.apache.druid.segment.realtime.plumber.Plumber
    public Object startJob() {
        log.info("Starting job for %s", new Object[]{getSchema().getDataSource()});
        try {
            FileUtils.mkdirp(computeBaseDir(getSchema()));
            initializeExecutors();
            if (this.flushScheduledExec == null) {
                this.flushScheduledExec = Execs.scheduledSingleThreaded("flushing_scheduled_%d");
            }
            Object bootstrapSinksFromDisk = bootstrapSinksFromDisk();
            startFlushThread();
            return bootstrapSinksFromDisk;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected void flushAfterDuration(final long j, final Sink sink) {
        log.info("Abandoning segment %s at %s", new Object[]{sink.getSegment().getId(), DateTimes.nowUtc().plusMillis((int) this.flushDuration.getMillis())});
        ScheduledExecutors.scheduleWithFixedDelay(this.flushScheduledExec, this.flushDuration, new Callable<ScheduledExecutors.Signal>() { // from class: org.apache.druid.segment.realtime.plumber.FlushingPlumber.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public ScheduledExecutors.Signal call() {
                FlushingPlumber.log.info("Abandoning segment %s", new Object[]{sink.getSegment().getId()});
                FlushingPlumber.this.abandonSegment(j, sink);
                return ScheduledExecutors.Signal.STOP;
            }
        });
    }

    private void startFlushThread() {
        final Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        DateTime bucketStart = segmentGranularity.bucketStart(DateTimes.nowUtc());
        final long millis = this.config.getWindowPeriod().toStandardDuration().getMillis();
        log.info("Expect to run at [%s]", new Object[]{DateTimes.nowUtc().plus(new Duration(System.currentTimeMillis(), this.schema.getGranularitySpec().getSegmentGranularity().increment(bucketStart).getMillis() + millis))});
        ScheduledExecutors.scheduleAtFixedRate(this.flushScheduledExec, new Duration(System.currentTimeMillis(), this.schema.getGranularitySpec().getSegmentGranularity().increment(bucketStart).getMillis() + millis), new Duration(bucketStart, segmentGranularity.increment(bucketStart)), new ThreadRenamingCallable<ScheduledExecutors.Signal>(StringUtils.format("%s-flusher-%d", new Object[]{getSchema().getDataSource(), Integer.valueOf(getConfig().getShardSpec().getPartitionNum())})) { // from class: org.apache.druid.segment.realtime.plumber.FlushingPlumber.2
            /* renamed from: doCall, reason: merged with bridge method [inline-methods] */
            public ScheduledExecutors.Signal m201doCall() {
                if (FlushingPlumber.this.stopped) {
                    FlushingPlumber.log.info("Stopping flusher thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                long millis2 = segmentGranularity.bucketStart(FlushingPlumber.this.getRejectionPolicy().getCurrMaxTime().minus(millis)).getMillis();
                ArrayList<Map.Entry> arrayList = new ArrayList();
                for (Map.Entry<Long, Sink> entry : FlushingPlumber.this.getSinks().entrySet()) {
                    if (entry.getKey().longValue() < millis2) {
                        FlushingPlumber.log.info("Adding entry[%s] to flush.", new Object[]{entry});
                        arrayList.add(entry);
                    }
                }
                for (Map.Entry entry2 : arrayList) {
                    FlushingPlumber.this.flushAfterDuration(((Long) entry2.getKey()).longValue(), (Sink) entry2.getValue());
                }
                if (!FlushingPlumber.this.stopped) {
                    return ScheduledExecutors.Signal.REPEAT;
                }
                FlushingPlumber.log.info("Stopping flusher thread", new Object[0]);
                return ScheduledExecutors.Signal.STOP;
            }
        });
    }

    @Override // org.apache.druid.segment.realtime.plumber.RealtimePlumber, org.apache.druid.segment.realtime.plumber.Plumber
    public void finishJob() {
        log.info("Stopping job", new Object[0]);
        for (Map.Entry<Long, Sink> entry : getSinks().entrySet()) {
            abandonSegment(entry.getKey().longValue(), entry.getValue());
        }
        shutdownExecutors();
        if (this.flushScheduledExec != null) {
            this.flushScheduledExec.shutdown();
        }
        this.stopped = true;
    }
}
