package com.datatorrent.stram;

import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.RecordField;
import com.datatorrent.common.codec.JsonStreamCodec;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.util.FSPartFileCollection;
import com.datatorrent.stram.webapp.ContainerInfo;
import com.datatorrent.stram.webapp.OperatorInfo;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/FSStatsRecorder.class */
public class FSStatsRecorder implements StatsRecorder {
    public static final String VERSION = "1.0";
    private static final Logger LOG = LoggerFactory.getLogger(FSStatsRecorder.class);
    private FSPartFileCollection containersStorage;
    private transient StreamCodec<Object> streamCodec;
    private String basePath = LogicalPlanConfiguration.KEY_SEPARATOR;
    private final Map<String, FSPartFileCollection> logicalOperatorStorageMap = new ConcurrentHashMap();
    private final Map<String, Integer> knownContainers = new HashMap();
    private final Set<String> knownOperators = new HashSet();
    private final Map<Class<?>, List<Field>> metaFields = new HashMap();
    private final Map<Class<?>, List<Field>> statsFields = new HashMap();
    private final BlockingQueue<WriteOperation> queue = new LinkedBlockingQueue();
    private final StatsRecorderThread statsRecorderThread = new StatsRecorderThread();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/stram/FSStatsRecorder$StatsRecorderThread.class */
    public class StatsRecorderThread extends Thread {
        private StatsRecorderThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    WriteOperation writeOperation = (WriteOperation) FSStatsRecorder.this.queue.take();
                    if (writeOperation.meta) {
                        writeOperation.storage.writeMetaData(writeOperation.bytes);
                    } else {
                        writeOperation.storage.writeDataItem(writeOperation.bytes, true);
                    }
                    Thread.yield();
                    if (FSStatsRecorder.this.queue.isEmpty()) {
                        FSStatsRecorder.this.containersStorage.flushData();
                        Iterator it = FSStatsRecorder.this.logicalOperatorStorageMap.values().iterator();
                        while (it.hasNext()) {
                            ((FSPartFileCollection) it.next()).flushData();
                        }
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Exception e2) {
                    FSStatsRecorder.LOG.error("Caught Exception", e2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/stram/FSStatsRecorder$WriteOperation.class */
    public static class WriteOperation {
        FSPartFileCollection storage;
        byte[] bytes;
        boolean meta;

        WriteOperation(FSPartFileCollection fSPartFileCollection, byte[] bArr, boolean z) {
            this.storage = fSPartFileCollection;
            this.bytes = bArr;
            this.meta = z;
        }
    }

    public void setBasePath(String str) {
        this.basePath = str;
    }

    public void setup() {
        try {
            this.streamCodec = new JsonStreamCodec();
            this.containersStorage = new FSPartFileCollection();
            this.containersStorage.setBasePath(this.basePath + "/containers");
            this.containersStorage.setup();
            this.containersStorage.writeMetaData("1.0\n".getBytes());
            this.statsRecorderThread.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void teardown() {
        this.statsRecorderThread.interrupt();
        try {
            this.statsRecorderThread.join();
        } catch (InterruptedException e) {
            LOG.warn("Stats recorder thread join interrupted");
        }
        if (this.containersStorage != null) {
            this.containersStorage.teardown();
        }
        Iterator<FSPartFileCollection> it = this.logicalOperatorStorageMap.values().iterator();
        while (it.hasNext()) {
            it.next().teardown();
        }
    }

    @Override // com.datatorrent.stram.StatsRecorder
    public void recordContainers(Map<String, StreamingContainerAgent> map, long j) throws IOException {
        int intValue;
        for (Map.Entry<String, StreamingContainerAgent> entry : map.entrySet()) {
            ContainerInfo containerInfo = entry.getValue().getContainerInfo();
            if (containerInfo.state.equals("ACTIVE")) {
                if (this.knownContainers.containsKey(entry.getKey())) {
                    intValue = this.knownContainers.get(entry.getKey()).intValue();
                } else {
                    intValue = this.knownContainers.size();
                    this.knownContainers.put(entry.getKey(), Integer.valueOf(intValue));
                    Map<String, Object> extractRecordFields = extractRecordFields(containerInfo, "meta");
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    Slice byteArray = this.streamCodec.toByteArray(extractRecordFields);
                    byteArrayOutputStream.write((String.valueOf(intValue) + ":").getBytes());
                    byteArrayOutputStream.write(byteArray.buffer, byteArray.offset, byteArray.length);
                    byteArrayOutputStream.write("\n".getBytes());
                    this.queue.add(new WriteOperation(this.containersStorage, byteArrayOutputStream.toByteArray(), true));
                }
                Map<String, Object> extractRecordFields2 = extractRecordFields(containerInfo, "stats");
                ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
                Slice byteArray2 = this.streamCodec.toByteArray(extractRecordFields2);
                byteArrayOutputStream2.write((String.valueOf(intValue) + ":").getBytes());
                byteArrayOutputStream2.write((String.valueOf(j) + ":").getBytes());
                byteArrayOutputStream2.write(byteArray2.buffer, byteArray2.offset, byteArray2.length);
                byteArrayOutputStream2.write("\n".getBytes());
                this.queue.add(new WriteOperation(this.containersStorage, byteArrayOutputStream2.toByteArray(), false));
            }
        }
    }

    @Override // com.datatorrent.stram.StatsRecorder
    public void recordOperators(List<OperatorInfo> list, long j) throws IOException {
        FSPartFileCollection fSPartFileCollection;
        for (OperatorInfo operatorInfo : list) {
            if (this.logicalOperatorStorageMap.containsKey(operatorInfo.name)) {
                fSPartFileCollection = this.logicalOperatorStorageMap.get(operatorInfo.name);
            } else {
                fSPartFileCollection = new FSPartFileCollection();
                fSPartFileCollection.setBasePath(this.basePath + "/operators/" + operatorInfo.name);
                fSPartFileCollection.setup();
                fSPartFileCollection.writeMetaData("1.0\n".getBytes());
                this.logicalOperatorStorageMap.put(operatorInfo.name, fSPartFileCollection);
            }
            if (!this.knownOperators.contains(operatorInfo.id)) {
                this.knownOperators.add(operatorInfo.id);
                Map<String, Object> extractRecordFields = extractRecordFields(operatorInfo, "meta");
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                Slice byteArray = this.streamCodec.toByteArray(extractRecordFields);
                byteArrayOutputStream.write(byteArray.buffer, byteArray.offset, byteArray.length);
                byteArrayOutputStream.write("\n".getBytes());
                this.queue.add(new WriteOperation(fSPartFileCollection, byteArrayOutputStream.toByteArray(), true));
            }
            Map<String, Object> extractRecordFields2 = extractRecordFields(operatorInfo, "stats");
            ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
            Slice byteArray2 = this.streamCodec.toByteArray(extractRecordFields2);
            byteArrayOutputStream2.write((operatorInfo.id + ":").getBytes());
            byteArrayOutputStream2.write((String.valueOf(j) + ":").getBytes());
            byteArrayOutputStream2.write(byteArray2.buffer, byteArray2.offset, byteArray2.length);
            byteArrayOutputStream2.write("\n".getBytes());
            this.queue.add(new WriteOperation(fSPartFileCollection, byteArrayOutputStream2.toByteArray(), false));
        }
    }

    public Map<String, Object> extractRecordFields(Object obj, String str) {
        List<Field> arrayList;
        HashMap hashMap = new HashMap();
        try {
            Map<Class<?>, List<Field>> map = null;
            if (str.equals("meta")) {
                map = this.metaFields;
            } else if (str.equals("stats")) {
                map = this.statsFields;
            }
            if (map == null || !map.containsKey(obj.getClass())) {
                arrayList = new ArrayList();
                for (Class<?> cls = obj.getClass(); cls != Object.class; cls = cls.getSuperclass()) {
                    for (Field field : cls.getDeclaredFields()) {
                        field.setAccessible(true);
                        RecordField annotation = field.getAnnotation(RecordField.class);
                        if (annotation != null && annotation.type().equals(str)) {
                            arrayList.add(field);
                        }
                    }
                }
                if (map != null) {
                    map.put(obj.getClass(), arrayList);
                }
            } else {
                arrayList = map.get(obj.getClass());
            }
            for (Field field2 : arrayList) {
                hashMap.put(field2.getName(), field2.get(obj));
            }
            return hashMap;
        } catch (IllegalAccessException e) {
            throw new RuntimeException(e);
        }
    }

    public void requestSync() {
        this.containersStorage.requestSync();
        Iterator<Map.Entry<String, FSPartFileCollection>> it = this.logicalOperatorStorageMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().requestSync();
        }
    }
}
