package com.datatorrent.stram;

import com.datatorrent.api.StreamCodec;
import com.datatorrent.common.codec.JsonStreamCodec;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.client.EventsAgent;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.util.FSPartFileCollection;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import com.google.common.base.Throwables;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import net.engio.mbassy.listener.Handler;
import org.apache.commons.beanutils.BeanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/FSEventRecorder.class */
public class FSEventRecorder implements EventRecorder {
    public static final String VERSION = "1.0";
    private static final Logger LOG = LoggerFactory.getLogger(FSEventRecorder.class);
    private FSPartFileCollection storage;
    private transient StreamCodec<Object> streamCodec;
    private SharedPubSubWebSocketClient wsClient;
    private final String pubSubTopic;
    private final BlockingQueue<StramEvent> queue = new LinkedBlockingQueue();
    private String basePath = LogicalPlanConfiguration.KEY_SEPARATOR;
    private final URI pubSubUrl = null;
    private int numSubscribers = 0;
    private final EventRecorderThread eventRecorderThread = new EventRecorderThread();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/stram/FSEventRecorder$EventRecorderThread.class */
    public class EventRecorderThread extends Thread {
        private EventRecorderThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    FSEventRecorder.this.writeEvent((StramEvent) FSEventRecorder.this.queue.take());
                    yield();
                    if (FSEventRecorder.this.queue.isEmpty() && !FSEventRecorder.this.storage.flushData() && FSEventRecorder.this.wsClient != null) {
                        FSEventRecorder.this.wsClient.publish("_internal.lastIndex.event." + FSEventRecorder.this.storage.getBasePath(), FSEventRecorder.this.storage.getLatestIndexLine());
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Exception e2) {
                    FSEventRecorder.LOG.error("Caught Exception", e2);
                }
            }
        }
    }

    public FSEventRecorder(String str) {
        LOG.debug("Event recorder created for {}", str);
        this.pubSubTopic = "applications." + str + ".events";
    }

    public void setWebSocketClient(SharedPubSubWebSocketClient sharedPubSubWebSocketClient) {
        this.wsClient = sharedPubSubWebSocketClient;
    }

    public void setBasePath(String str) {
        this.basePath = str;
    }

    public void setup() {
        try {
            this.streamCodec = new JsonStreamCodec();
            this.storage = new FSPartFileCollection();
            this.storage.setBasePath(this.basePath);
            this.storage.setup();
            this.storage.writeMetaData("1.0\n".getBytes());
            if (this.wsClient != null) {
                try {
                    setupWsClient();
                } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
                    LOG.error("Cannot connect to gateway at {}", this.pubSubUrl);
                }
            }
            this.eventRecorderThread.start();
        } catch (Exception e2) {
            throw Throwables.propagate(e2);
        }
    }

    public void teardown() {
        this.eventRecorderThread.interrupt();
        try {
            this.eventRecorderThread.join();
        } catch (InterruptedException e) {
            LOG.warn("Event recorder thread join interrupted");
        }
        if (this.storage != null) {
            this.storage.teardown();
        }
    }

    @Override // com.datatorrent.stram.EventRecorder
    @Handler
    public void recordEventAsync(StramEvent stramEvent) {
        LOG.debug("Adding event {} to the queue", stramEvent.getType());
        this.queue.add(stramEvent);
    }

    public void writeEvent(StramEvent stramEvent) throws Exception {
        LOG.debug("Writing event {} to the storage", stramEvent.getType());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byteArrayOutputStream.write((stramEvent.getTimestamp() + ":").getBytes());
        byteArrayOutputStream.write((stramEvent.getType() + ":").getBytes());
        Map<String, String> describe = BeanUtils.describe(stramEvent);
        describe.remove("timestamp");
        describe.remove(LogicalPlanConfiguration.CLASS);
        describe.remove("type");
        Slice byteArray = this.streamCodec.toByteArray(describe);
        byteArrayOutputStream.write(byteArray.buffer, byteArray.offset, byteArray.length);
        byteArrayOutputStream.write("\n".getBytes());
        this.storage.writeDataItem(byteArrayOutputStream.toByteArray(), true);
        if (this.numSubscribers > 0) {
            LOG.debug("Publishing event {} through websocket to gateway", stramEvent.getType());
            EventsAgent.EventInfo eventInfo = new EventsAgent.EventInfo();
            eventInfo.id = stramEvent.getId();
            eventInfo.timestamp = stramEvent.getTimestamp();
            eventInfo.type = stramEvent.getType();
            eventInfo.data = describe;
            eventInfo.data.remove("id");
            this.wsClient.publish(this.pubSubTopic, eventInfo);
        }
    }

    private void setupWsClient() throws ExecutionException, IOException, InterruptedException, TimeoutException {
        this.wsClient.addHandler(this.pubSubTopic, true, new SharedPubSubWebSocketClient.Handler() { // from class: com.datatorrent.stram.FSEventRecorder.1
            @Override // com.datatorrent.stram.util.SharedPubSubWebSocketClient.Handler
            public void onMessage(String str, String str2, Object obj) {
                FSEventRecorder.this.numSubscribers = Integer.valueOf((String) obj).intValue();
                FSEventRecorder.LOG.info("Number of subscribers is now {}", Integer.valueOf(FSEventRecorder.this.numSubscribers));
            }

            @Override // com.datatorrent.stram.util.SharedPubSubWebSocketClient.Handler
            public void onClose() {
                FSEventRecorder.this.numSubscribers = 0;
            }
        });
    }

    public void requestSync() {
        this.storage.requestSync();
    }
}
