package org.apache.iotdb.trigger;

import java.util.Collections;
import java.util.Date;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.trigger.api.Trigger;
import org.apache.iotdb.trigger.api.TriggerAttributes;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/trigger/StatisticsUpdaterTrigger.class */
public class StatisticsUpdaterTrigger implements Trigger {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) StatisticsUpdaterTrigger.class);
    private static final String TARGET_DEVICE = "root.__system.statistics";
    private static final String TARGET_SERIES = "total_count";
    private String ip;
    private int port;
    private Session session;
    private Future<?> updateFuture;
    private static final long UPDATE_INTERVAL = 20000;
    private AtomicLong cnt = new AtomicLong(0);
    private final ScheduledExecutorService triggerInformationUpdateExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Stateful-Trigger-Statistics-Updater");

    @Override // org.apache.iotdb.trigger.api.Trigger
    public void onCreate(TriggerAttributes triggerAttributes) throws Exception {
        if (!triggerAttributes.hasAttribute("ip")) {
            throw new RuntimeException("ip is required");
        }
        this.ip = triggerAttributes.getString("ip");
        if (!triggerAttributes.hasAttribute("port")) {
            throw new RuntimeException("port is required");
        }
        this.port = Integer.parseInt(triggerAttributes.getString("port"));
    }

    @Override // org.apache.iotdb.trigger.api.Trigger
    public boolean fire(Tablet tablet) throws Exception {
        ensureSession();
        if (tablet.bitMaps == null) {
            this.cnt.addAndGet(tablet.rowSize * tablet.getSchemas().size());
            return true;
        }
        for (int i = 0; i < tablet.getSchemas().size(); i++) {
            BitMap bitMap = tablet.bitMaps[i];
            if (bitMap == null) {
                this.cnt.addAndGet(tablet.rowSize);
            } else {
                for (int i2 = 0; i2 < tablet.rowSize; i2++) {
                    if (!bitMap.isMarked(i2)) {
                        this.cnt.incrementAndGet();
                    }
                }
            }
        }
        return true;
    }

    @Override // org.apache.iotdb.trigger.api.Trigger
    public void restore() throws Exception {
        ensureSession();
        try {
            SessionDataSet executeQueryStatement = this.session.executeQueryStatement(String.format("select last %s from %s", TARGET_SERIES, TARGET_DEVICE));
            if (executeQueryStatement.hasNext()) {
                this.cnt = new AtomicLong(executeQueryStatement.next().getFields().get(0).getLongV());
            }
        } catch (Exception e) {
            LOGGER.warn("Error occurred when trying to restore stateful trigger", (Throwable) e);
        }
        LOGGER.info("###### restore ##########");
    }

    @Override // org.apache.iotdb.trigger.api.Trigger
    public void onDrop() throws Exception {
        LOGGER.info("********** onDrop() ***********");
        if (this.session != null) {
            this.session.close();
            this.updateFuture.cancel(true);
        }
    }

    @Override // org.apache.iotdb.trigger.api.Trigger
    public FailureStrategy getFailureStrategy() {
        return FailureStrategy.OPTIMISTIC;
    }

    private void ensureSession() throws IoTDBConnectionException {
        if (this.session == null) {
            this.session = new Session.Builder().host(this.ip).port(this.port).build();
            this.session.open(false);
            this.updateFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(this.triggerInformationUpdateExecutor, this::updateTask, UPDATE_INTERVAL, UPDATE_INTERVAL, TimeUnit.MILLISECONDS);
            LOGGER.info("Stateful-Trigger-Statistics-Updater is successfully started.");
        }
    }

    private void updateTask() {
        try {
            this.session.insertRecord(TARGET_DEVICE, new Date().getTime(), Collections.singletonList(TARGET_SERIES), Collections.singletonList(TSDataType.INT64), new Object[]{Long.valueOf(this.cnt.get())});
        } catch (Exception e) {
            LOGGER.warn("Error occurred in updateTask", (Throwable) e);
        }
    }
}
