package org.apache.druid.data.input.impl;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/druid/data/input/impl/TimedShutoffInputSourceReader.class */
public class TimedShutoffInputSourceReader implements InputSourceReader {
    private static final Logger LOG = new Logger(TimedShutoffInputSourceReader.class);
    private final InputSourceReader delegate;
    private final DateTime shutoffTime;

    public TimedShutoffInputSourceReader(InputSourceReader inputSourceReader, DateTime dateTime) {
        this.delegate = inputSourceReader;
        this.shutoffTime = dateTime;
    }

    @Override // org.apache.druid.data.input.InputSourceReader
    public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException {
        return decorateShutdownTimeout(Execs.scheduledSingleThreaded("timed-shutoff-reader-%d"), this.delegate.read(inputStats));
    }

    @Override // org.apache.druid.data.input.InputSourceReader
    public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException {
        return decorateShutdownTimeout(Execs.scheduledSingleThreaded("timed-shutoff-reader-%d"), this.delegate.sample());
    }

    private <T> CloseableIterator<T> decorateShutdownTimeout(ScheduledExecutorService scheduledExecutorService, final CloseableIterator<T> closeableIterator) {
        final Closer create = Closer.create();
        create.register(closeableIterator);
        Objects.requireNonNull(scheduledExecutorService);
        create.register(scheduledExecutorService::shutdownNow);
        CloseableIterator<T> closeableIterator2 = new CloseableIterator<T>() { // from class: org.apache.druid.data.input.impl.TimedShutoffInputSourceReader.1
            volatile boolean closed;
            T next = null;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.next != null) {
                    return true;
                }
                if (this.closed || !closeableIterator.hasNext()) {
                    return false;
                }
                this.next = closeableIterator.next();
                return true;
            }

            @Override // java.util.Iterator
            public T next() {
                if (this.next == null) {
                    throw new NoSuchElementException();
                }
                T t = this.next;
                this.next = null;
                return t;
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                this.closed = true;
                create.close();
            }
        };
        scheduledExecutorService.schedule(() -> {
            LOG.info("Closing delegate inputSource.", new Object[0]);
            try {
                closeableIterator2.close();
            } catch (IOException e) {
                LOG.warn(e, "Failed to close delegate inputSource, ignoring.", new Object[0]);
            }
        }, this.shutoffTime.getMillis() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        return closeableIterator2;
    }
}
