package org.apache.avro.ipc.trace;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.mortbay.util.URIUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/avro-ipc-1.6.2.jar:org/apache/avro/ipc/trace/FileSpanStorage.class
 */
/* loaded from: input_file:lib/cdap-etl-batch-4.1.3.jar:lib/avro-ipc-1.6.2.jar:org/apache/avro/ipc/trace/FileSpanStorage.class */
public class FileSpanStorage implements SpanStorage {
    private static final String FILE_SUFFIX = ".av";
    private static final SpecificDatumWriter<Span> SPAN_WRITER = new SpecificDatumWriter<>(Span.class);
    private static final SpecificDatumReader<Span> SPAN_READER = new SpecificDatumReader<>(Span.class);
    private static final Logger LOG = LoggerFactory.getLogger(FileSpanStorage.class);
    private int secondsPerFile;
    private String traceFileDir;
    private Thread writer;
    private long maxSpans = SpanStorage.DEFAULT_MAX_SPANS;
    private TreeMap<Long, File> files = new TreeMap<>();
    LinkedBlockingQueue<Span> outstanding = new LinkedBlockingQueue<>();

    /* JADX WARN: Classes with same name are omitted:
      input_file:lib/avro-ipc-1.6.2.jar:org/apache/avro/ipc/trace/FileSpanStorage$DiskWriterThread.class
     */
    /* loaded from: input_file:lib/cdap-etl-batch-4.1.3.jar:lib/avro-ipc-1.6.2.jar:org/apache/avro/ipc/trace/FileSpanStorage$DiskWriterThread.class */
    private class DiskWriterThread implements Runnable {
        private BlockingQueue<Span> outstanding;
        private TreeMap<Long, File> files;
        private long spansSoFar;
        private DataFileWriter<Span> currentWriter;
        private boolean doBuffer;
        private int compressionLevel;
        private HashMap<File, Long> spansPerFile = new HashMap<>();
        private long currentTimestamp = 0;

        public DiskWriterThread(BlockingQueue<Span> blockingQueue, TreeMap<Long, File> treeMap, boolean z, int i) {
            this.outstanding = blockingQueue;
            this.files = treeMap;
            this.doBuffer = z;
            this.compressionLevel = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                Span span = null;
                try {
                    span = this.outstanding.take();
                } catch (InterruptedException e) {
                    FileSpanStorage.LOG.warn("Thread interrupted");
                    Thread.currentThread().interrupt();
                }
                try {
                    assureCurrentWriter();
                    this.currentWriter.append(span);
                    if (!this.doBuffer) {
                        this.currentWriter.flush();
                    }
                    this.spansSoFar++;
                    File value = this.files.lastEntry().getValue();
                    this.spansPerFile.put(value, Long.valueOf(this.spansPerFile.get(value).longValue() + 1));
                } catch (IOException e2) {
                    FileSpanStorage.LOG.warn("Error setting span file: " + e2.getMessage());
                }
            }
        }

        private void assureCurrentWriter() throws IOException {
            File remove;
            boolean z = false;
            while (this.spansSoFar >= FileSpanStorage.this.maxSpans) {
                synchronized (this.files) {
                    remove = this.files.remove(this.files.firstKey());
                }
                this.spansSoFar -= this.spansPerFile.get(remove).longValue();
                this.spansPerFile.remove(remove);
                remove.delete();
            }
            if (this.files.size() == 0) {
                this.currentTimestamp = 0L;
                this.currentWriter = null;
            }
            long floorSecond = FileSpanStorage.this.floorSecond(System.currentTimeMillis() / 1000);
            if (this.currentWriter == null) {
                z = true;
            } else if (floorSecond >= this.currentTimestamp + FileSpanStorage.this.secondsPerFile) {
                this.currentWriter.close();
                z = true;
            }
            if (z) {
                File file = new File(FileSpanStorage.this.traceFileDir + URIUtil.SLASH + Thread.currentThread().getId() + "_" + floorSecond + FileSpanStorage.FILE_SUFFIX);
                synchronized (this.files) {
                    this.files.put(Long.valueOf(floorSecond), file);
                }
                this.spansPerFile.put(file, 0L);
                this.currentWriter = new DataFileWriter<>(FileSpanStorage.SPAN_WRITER);
                this.currentWriter.setCodec(CodecFactory.deflateCodec(this.compressionLevel));
                this.currentWriter.create(Span.SCHEMA$, file);
                this.currentTimestamp = floorSecond;
            }
        }
    }

    private static void readFileSpans(File file, List<Span> list) throws IOException {
        Iterator<D> it = new DataFileReader(file, SPAN_READER).iterator();
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        list.addAll(arrayList);
    }

    private static void readFileSpans(File file, List<Span> list, long j, long j2) throws IOException {
        Iterator<D> it = new DataFileReader(file, SPAN_READER).iterator();
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            Span span = (Span) it.next();
            if (Util.spanInRange(span, j, j2)) {
                arrayList.add(span);
            }
        }
        list.addAll(arrayList);
    }

    public FileSpanStorage(boolean z, TracePluginConfiguration tracePluginConfiguration) {
        this.secondsPerFile = 600;
        this.traceFileDir = "/tmp";
        this.writer = new Thread(new DiskWriterThread(this.outstanding, this.files, z, tracePluginConfiguration.compressionLevel));
        this.secondsPerFile = tracePluginConfiguration.fileGranularitySeconds;
        this.traceFileDir = tracePluginConfiguration.spanStorageDir;
        this.writer.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long floorSecond(long j) {
        return j - (j % this.secondsPerFile);
    }

    @Override // org.apache.avro.ipc.trace.SpanStorage
    public void addSpan(Span span) {
        this.outstanding.add(span);
    }

    @Override // org.apache.avro.ipc.trace.SpanStorage
    public List<Span> getAllSpans() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.files) {
            for (File file : this.files.values()) {
                try {
                    readFileSpans(file, arrayList);
                } catch (IOException e) {
                    LOG.warn("Error reading file: " + file.getAbsolutePath());
                }
            }
        }
        return arrayList;
    }

    public void clear() {
        synchronized (this.files) {
            Iterator it = new LinkedList(this.files.keySet()).iterator();
            while (it.hasNext()) {
                this.files.remove((Long) it.next()).delete();
            }
        }
    }

    @Override // org.apache.avro.ipc.trace.SpanStorage
    public void setMaxSpans(long j) {
        this.maxSpans = j;
    }

    @Override // org.apache.avro.ipc.trace.SpanStorage
    public List<Span> getSpansInRange(long j, long j2) {
        ArrayList arrayList = new ArrayList();
        LinkedList<Long> linkedList = new LinkedList();
        long j3 = j / SpanStorage.NANOS_PER_SECOND;
        long j4 = j2 / SpanStorage.NANOS_PER_SECOND;
        int i = ((int) (j4 - j3)) / this.secondsPerFile;
        for (int i2 = 1; i2 < i; i2++) {
            linkedList.add(Long.valueOf(j3 + (i2 * this.secondsPerFile)));
        }
        synchronized (this.files) {
            for (Long l : linkedList) {
                if (this.files.containsKey(l)) {
                    try {
                        readFileSpans(this.files.get(l), arrayList);
                    } catch (IOException e) {
                        LOG.warn("Error reading file: " + this.files.get(l).getAbsolutePath());
                    }
                }
            }
            if (this.files.containsKey(Long.valueOf(j3))) {
                try {
                    readFileSpans(this.files.get(Long.valueOf(j3)), arrayList, j, j2);
                } catch (IOException e2) {
                    LOG.warn("Error reading file: " + this.files.get(Long.valueOf(j3)).getAbsolutePath());
                }
            }
            if (this.files.containsKey(Long.valueOf(j4))) {
                try {
                    readFileSpans(this.files.get(Long.valueOf(j4)), arrayList, j, j2);
                } catch (IOException e3) {
                    LOG.warn("Error reading file: " + this.files.get(Long.valueOf(j4)).getAbsolutePath());
                }
            }
        }
        return arrayList;
    }
}
