package org.apache.streampipes.sinks.internal.jvm.datalake;

import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.commons.TimeSeriesStore;
import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;

/* loaded from: input_file:org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.class */
public class DataLakeSink extends StreamPipesDataSink {
    private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
    private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
    public static final String SCHEMA_UPDATE_KEY = "schema_update";
    public static final String SCHEMA_UPDATE_OPTION = "Update schema";
    public static final String EXTEND_EXISTING_SCHEMA_OPTION = "Extend existing schema";
    private TimeSeriesStore timeSeriesStore;

    public DataSinkDescription declareModel() {
        return DataSinkBuilder.create("org.apache.streampipes.sinks.internal.jvm.datalake", 1).withLocales(new Locales[]{Locales.EN}).withAssets(new String[]{"documentation.md", "icon.png"}).category(new DataSinkType[]{DataSinkType.INTERNAL}).requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(), Labels.withId(TIMESTAMP_MAPPING_KEY), PropertyScope.NONE).build()).requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY)).requiredSingleValueSelection(Labels.withId(SCHEMA_UPDATE_KEY), Options.from(new String[]{SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION})).build();
    }

    public void onInvocation(SinkParams sinkParams, EventSinkRuntimeContext eventSinkRuntimeContext) throws SpRuntimeException {
        IDataSinkParameterExtractor extractor = sinkParams.extractor();
        DataLakeMeasure dataLakeMeasure = new DataLakeMeasure((String) extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class), extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY), ((SchemaInfo) sinkParams.getInputSchemaInfos().get(0)).getEventSchema());
        if (((String) extractor.selectedSingleValue(SCHEMA_UPDATE_KEY, String.class)).equals(EXTEND_EXISTING_SCHEMA_OPTION)) {
            dataLakeMeasure.setSchemaUpdateStrategy(DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA);
        } else {
            dataLakeMeasure.setSchemaUpdateStrategy(DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA);
        }
        this.timeSeriesStore = new TimeSeriesStore(Environments.getEnvironment(), eventSinkRuntimeContext.getStreamPipesClient(), dataLakeMeasure, true);
    }

    public void onEvent(Event event) throws SpRuntimeException {
        this.timeSeriesStore.onEvent(event);
    }

    public void onDetach() throws SpRuntimeException {
        this.timeSeriesStore.close();
    }
}
