package io.mantisrx.sourcejobs.publish.stages;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import io.mantisrx.common.codec.Codecs;
import io.mantisrx.publish.proto.MantisEventEnvelope;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import rx.Observable;

/* loaded from: input_file:io/mantisrx/sourcejobs/publish/stages/EchoStage.class */
public class EchoStage implements ScalarComputation<String, String> {
    private static final Logger LOGGER = Logger.getLogger(EchoStage.class);
    private String clusterName;
    private String sourceNamePrefix;
    private ObjectReader mantisEventEnvelopeReader;
    private int bufferDuration = 100;
    private final ObjectMapper mapper = new ObjectMapper();

    public void init(Context context) {
        this.clusterName = context.getWorkerInfo().getJobClusterName();
        this.bufferDuration = ((Integer) context.getParameters().get("bufferDurationMillis")).intValue();
        this.sourceNamePrefix = "{\"mantis.meta.sourceName\":\"" + this.clusterName + "\",";
        this.mantisEventEnvelopeReader = this.mapper.readerFor(MantisEventEnvelope.class);
    }

    private String insertSourceJobName(String str) {
        StringBuilder sb = new StringBuilder(this.sourceNamePrefix);
        int indexOf = str.indexOf(123);
        if (indexOf != -1) {
            str = sb.append(str.substring(indexOf + 1)).toString();
        }
        return str;
    }

    public Observable<String> call(Context context, Observable<String> observable) {
        return observable.buffer(this.bufferDuration, TimeUnit.MILLISECONDS).flatMapIterable(list -> {
            return list;
        }).filter(str -> {
            return Boolean.valueOf(!str.isEmpty());
        }).flatMap(str2 -> {
            try {
                return Observable.from(((MantisEventEnvelope) this.mantisEventEnvelopeReader.readValue(str2)).getEventList()).map(mantisEvent -> {
                    return mantisEvent.getData();
                });
            } catch (IOException e) {
                LOGGER.error(e.getMessage());
                return Observable.just(str2);
            }
        }).map(this::insertSourceJobName).onErrorResumeNext(th -> {
            LOGGER.error("Exception occurred in : " + this.clusterName + " error is " + th.getMessage());
            return Observable.empty();
        });
    }

    public static List<ParameterDefinition<?>> getParameters() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new IntParameter().name("bufferDurationMillis").description("buffer time in millis").validator(Validators.range(100, 10000)).defaultValue(250).build());
        return arrayList;
    }

    public static ScalarToScalar.Config<String, String> config() {
        return new ScalarToScalar.Config().codec(Codecs.string()).concurrentInput().withParameters(getParameters());
    }
}
