package gobblin.converter;

import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.converter.initializer.ConverterInitializer;
import gobblin.converter.initializer.NoopConverterInitializer;
import gobblin.records.ControlMessageHandler;
import gobblin.records.RecordStreamProcessor;
import gobblin.records.RecordStreamWithMetadata;
import gobblin.source.workunit.WorkUnitStream;
import gobblin.stream.ControlMessage;
import gobblin.stream.RecordEnvelope;
import gobblin.util.FinalState;
import io.reactivex.Flowable;
import java.io.Closeable;
import java.io.IOException;

/* loaded from: input_file:WEB-INF/lib/gobblin-api-0.11.0.jar:gobblin/converter/Converter.class */
public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState, RecordStreamProcessor<SI, SO, DI, DO> {
    public Converter<SI, SO, DI, DO> init(WorkUnitState workUnitState) {
        return this;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    public abstract SO convertSchema(SI si, WorkUnitState workUnitState) throws SchemaConversionException;

    public abstract Iterable<DO> convertRecord(SO so, DI di, WorkUnitState workUnitState) throws DataConversionException;

    @Override // gobblin.util.FinalState
    public State getFinalState() {
        return new State();
    }

    public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<DI, SI> recordStreamWithMetadata, WorkUnitState workUnitState) throws SchemaConversionException {
        init(workUnitState);
        SO convertSchema = convertSchema(recordStreamWithMetadata.getSchema(), workUnitState);
        return recordStreamWithMetadata.withRecordStream(recordStreamWithMetadata.getRecordStream().flatMap(streamEntity -> {
            if (streamEntity instanceof ControlMessage) {
                getMessageHandler().handleMessage((ControlMessage) streamEntity);
                return Flowable.just((ControlMessage) streamEntity);
            }
            if (!(streamEntity instanceof RecordEnvelope)) {
                throw new UnsupportedOperationException();
            }
            RecordEnvelope recordEnvelope = (RecordEnvelope) streamEntity;
            Flowable fromIterable = Flowable.fromIterable(convertRecord(convertSchema, recordEnvelope.getRecord(), workUnitState));
            recordEnvelope.getClass();
            return fromIterable.map(recordEnvelope::withRecord);
        }, 1).doOnComplete(this::close), convertSchema);
    }

    public ControlMessageHandler getMessageHandler() {
        return ControlMessageHandler.NOOP;
    }

    public ConverterInitializer getInitializer(State state, WorkUnitStream workUnitStream, int i, int i2) {
        return NoopConverterInitializer.INSTANCE;
    }
}
