package net.quasardb.kinesis;

import java.io.IOException;
import java.time.Instant;
import java.util.AbstractCollection;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import net.quasardb.qdb.Session;
import net.quasardb.qdb.ts.Table;
import net.quasardb.qdb.ts.Tables;
import net.quasardb.qdb.ts.WritableRow;
import net.quasardb.qdb.ts.Writer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/quasardb/kinesis/Relay.class */
public class Relay {
    private static final Logger logger;
    private Random rand = new Random();
    int currentSize;
    Session session;
    TableRegistry tableRegistry;
    Writer writer;
    AbstractCollection<String> tableNames;
    Writer.PushMode pushMode;
    long initializedAtEpochMillis;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:net/quasardb/kinesis/Relay$WorkItem.class */
    public static class WorkItem {
        String tableName;
        WritableRow row;

        public WorkItem(String str, WritableRow writableRow) {
            this.tableName = str;
            this.row = writableRow;
        }
    }

    public Relay(int i, Session session, AbstractCollection<String> abstractCollection, Writer.PushMode pushMode) {
        this.session = session;
        this.tableNames = abstractCollection;
        this.pushMode = pushMode;
    }

    public void close() throws IOException {
        logger.info("Closing writer");
        if (this.writer != null) {
            this.writer.close();
            this.writer = null;
        }
        logger.info("Closing session");
        if (this.session != null) {
            this.session.close();
            this.session = null;
        }
        logger.info("Relay closed");
    }

    public int size() {
        return this.currentSize;
    }

    public long ageMillis() {
        return Instant.now().toEpochMilli() - this.initializedAtEpochMillis;
    }

    public void enqueue(List<WorkItem> list) {
        if (!$assertionsDisabled && this.writer == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.tableRegistry == null) {
            throw new AssertionError();
        }
        for (WorkItem workItem : list) {
            processRow(workItem.row, workItem.tableName);
        }
        this.currentSize += list.size();
    }

    public void flush() throws IOException {
        if (!$assertionsDisabled && this.writer == null) {
            throw new AssertionError();
        }
        this.writer.flush();
        this.currentSize = 0;
    }

    public void resetWriter() throws IOException {
        if (this.writer != null) {
            this.writer.close();
            this.writer = null;
        }
        initWriter();
    }

    private static Writer createWriter(Session session, Table[] tableArr, Writer.PushMode pushMode) throws IOException {
        switch (pushMode) {
            case ASYNC:
                return Tables.pinnedAsyncWriter(session, tableArr);
            case FAST:
                return Tables.pinnedFastWriter(session, tableArr);
            default:
                throw new RuntimeException("Unsupported pushMode: " + pushMode.toString());
        }
    }

    public void initWriter() throws IOException {
        if (!$assertionsDisabled && this.tableNames == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.session == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.writer != null) {
            throw new AssertionError();
        }
        logger.info("resetting batch writer");
        this.tableRegistry = new TableRegistry();
        logger.debug("Looking up metadata for {} tables", Integer.valueOf(this.tableNames.size()));
        Table table = null;
        Table[] tableArr = new Table[this.tableNames.size()];
        int i = 0;
        Iterator<String> it2 = this.tableNames.iterator();
        while (it2.hasNext()) {
            String next = it2.next();
            if (table == null) {
                table = new Table(this.session, next);
            }
            int i2 = i;
            i++;
            tableArr[i2] = Table.likeOther(table, next);
        }
        logger.debug("Initializing writer");
        Writer createWriter = createWriter(this.session, tableArr, this.pushMode);
        logger.debug("Recording table info metadata");
        for (Table table2 : tableArr) {
            if (!$assertionsDisabled && this.tableRegistry.get(table2.getName()) != null) {
                throw new AssertionError();
            }
            TableInfo put = this.tableRegistry.put(table2);
            if (put == null) {
                throw new RuntimeException("Unable to initialize batch writer state: table not yet created: " + table2.getName());
            }
            if (!$assertionsDisabled && put.hasOffset()) {
                throw new AssertionError();
            }
            put.setOffset(createWriter.tableIndexByName(table2.getName()));
        }
        this.initializedAtEpochMillis = Instant.now().toEpochMilli();
        this.writer = createWriter;
        logger.info("Relay initialized");
    }

    private void processRow(WritableRow writableRow, String str) {
        try {
            TableInfo tableInfo = this.tableRegistry.get(str);
            if (!$assertionsDisabled && tableInfo == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !tableInfo.hasOffset()) {
                throw new AssertionError();
            }
            this.writer.append(Integer.valueOf(tableInfo.getOffset()), writableRow);
        } catch (Exception e) {
            logger.error("An error occured while writing row: ", (Throwable) e);
            e.printStackTrace();
        }
    }

    static {
        $assertionsDisabled = !Relay.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) Relay.class);
    }
}
