package org.apache.gobblin.converter;

import com.google.common.base.Optional;
import io.reactivex.Flowable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.initializer.ConverterInitializer;
import org.apache.gobblin.converter.initializer.NoopConverterInitializer;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.MetadataUpdateControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.FinalState;

/* loaded from: input_file:WEB-INF/lib/gobblin-api-0.12.0.jar:org/apache/gobblin/converter/Converter.class */
public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState, RecordStreamProcessor<SI, SO, DI, DO> {
    private GlobalMetadata<SO> outputGlobalMetadata;

    public Converter<SI, SO, DI, DO> init(WorkUnitState workUnitState) {
        return this;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    public abstract SO convertSchema(SI si, WorkUnitState workUnitState) throws SchemaConversionException;

    public abstract Iterable<DO> convertRecord(SO so, DI di, WorkUnitState workUnitState) throws DataConversionException;

    @Override // org.apache.gobblin.util.FinalState
    public State getFinalState() {
        return new State();
    }

    @Override // org.apache.gobblin.records.RecordStreamProcessor
    public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<DI, SI> recordStreamWithMetadata, WorkUnitState workUnitState) throws SchemaConversionException {
        init(workUnitState);
        this.outputGlobalMetadata = GlobalMetadata.builderWithInput(recordStreamWithMetadata.getGlobalMetadata(), Optional.fromNullable(convertSchema(recordStreamWithMetadata.getGlobalMetadata().getSchema(), workUnitState))).build();
        return recordStreamWithMetadata.withRecordStream((Flowable) recordStreamWithMetadata.getRecordStream().flatMap(streamEntity -> {
            if (streamEntity instanceof ControlMessage) {
                ControlMessage controlMessage = (ControlMessage) streamEntity;
                getMessageHandler().handleMessage((ControlMessage) streamEntity);
                if (streamEntity instanceof MetadataUpdateControlMessage) {
                    this.outputGlobalMetadata = GlobalMetadata.builderWithInput(((MetadataUpdateControlMessage) streamEntity).getGlobalMetadata(), Optional.fromNullable(convertSchema(((MetadataUpdateControlMessage) streamEntity).getGlobalMetadata().getSchema(), workUnitState))).build();
                    controlMessage = new MetadataUpdateControlMessage(this.outputGlobalMetadata);
                }
                return Flowable.just(controlMessage);
            }
            if (!(streamEntity instanceof RecordEnvelope)) {
                throw new UnsupportedOperationException();
            }
            RecordEnvelope recordEnvelope = (RecordEnvelope) streamEntity;
            Iterator it = convertRecord(this.outputGlobalMetadata.getSchema(), recordEnvelope.getRecord(), workUnitState).iterator();
            if (!it.hasNext()) {
                streamEntity.ack();
                return Flowable.empty();
            }
            Object next = it.next();
            if (!it.hasNext()) {
                return Flowable.just(recordEnvelope.withRecord(next));
            }
            RecordEnvelope.ForkRecordBuilder forkRecordBuilder = recordEnvelope.forkRecordBuilder();
            Flowable concatWith = Flowable.just(next).concatWith(Flowable.fromIterable(() -> {
                return it;
            }));
            forkRecordBuilder.getClass();
            Flowable map = concatWith.map(forkRecordBuilder::childRecord);
            forkRecordBuilder.getClass();
            return map.doOnComplete(forkRecordBuilder::close);
        }, 1).doOnComplete(this::close), (GlobalMetadata) this.outputGlobalMetadata);
    }

    public ControlMessageHandler getMessageHandler() {
        return ControlMessageHandler.NOOP;
    }

    public ConverterInitializer getInitializer(State state, WorkUnitStream workUnitStream, int i, int i2) {
        return NoopConverterInitializer.INSTANCE;
    }
}
