package gobblin.converter;

import gobblin.annotation.Alpha;
import gobblin.configuration.WorkUnitState;
import gobblin.records.RecordStreamWithMetadata;
import gobblin.stream.ControlMessage;
import gobblin.stream.RecordEnvelope;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.annotations.NonNull;
import java.beans.ConstructorProperties;
import java.util.concurrent.CompletableFuture;

@Alpha
/* loaded from: input_file:WEB-INF/lib/gobblin-core-base-0.11.0.jar:gobblin/converter/AsyncConverter1to1.class */
public abstract class AsyncConverter1to1<SI, SO, DI, DO> extends Converter<SI, SO, DI, DO> {
    public static final String MAX_CONCURRENT_ASYNC_CONVERSIONS_KEY = "gobblin.converter.maxConcurrentAsyncConversions";
    public static final int DEFAULT_MAX_CONCURRENT_ASYNC_CONVERSIONS = 20;

    /* loaded from: input_file:WEB-INF/lib/gobblin-core-base-0.11.0.jar:gobblin/converter/AsyncConverter1to1$SingleAsync.class */
    private class SingleAsync extends Single<RecordEnvelope<DO>> {
        private final RecordEnvelope<DI> originalRecord;
        private final CompletableFuture<DO> completableFuture;

        @Override // io.reactivex.Single
        protected void subscribeActual(@NonNull SingleObserver<? super RecordEnvelope<DO>> singleObserver) {
            this.completableFuture.thenAccept(obj -> {
                singleObserver.onSuccess(this.originalRecord.withRecord(obj));
            }).exceptionally(th -> {
                singleObserver.onError(th);
                return null;
            });
        }

        @ConstructorProperties({"originalRecord", "completableFuture"})
        public SingleAsync(RecordEnvelope<DI> recordEnvelope, CompletableFuture<DO> completableFuture) {
            this.originalRecord = recordEnvelope;
            this.completableFuture = completableFuture;
        }
    }

    @Override // gobblin.converter.Converter
    public abstract SO convertSchema(SI si, WorkUnitState workUnitState) throws SchemaConversionException;

    @Override // gobblin.converter.Converter
    public final Iterable<DO> convertRecord(SO so, DI di, WorkUnitState workUnitState) throws DataConversionException {
        throw new UnsupportedOperationException("Async converters are only supported in stream mode. Make sure to set task.execution.synchronousExecutionModel to false.");
    }

    protected abstract CompletableFuture<DO> convertRecordAsync(SO so, DI di, WorkUnitState workUnitState) throws DataConversionException;

    /* JADX WARN: Multi-variable type inference failed */
    @Override // gobblin.converter.Converter, gobblin.records.RecordStreamProcessor
    public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<DI, SI> recordStreamWithMetadata, WorkUnitState workUnitState) throws SchemaConversionException {
        int propAsInt = workUnitState.getPropAsInt(MAX_CONCURRENT_ASYNC_CONVERSIONS_KEY, 20);
        SO convertSchema = convertSchema(recordStreamWithMetadata.getSchema(), workUnitState);
        return recordStreamWithMetadata.withRecordStream(recordStreamWithMetadata.getRecordStream().flatMapSingle(streamEntity -> {
            if (streamEntity instanceof ControlMessage) {
                getMessageHandler().handleMessage((ControlMessage) streamEntity);
                return Single.just((ControlMessage) streamEntity);
            }
            if (!(streamEntity instanceof RecordEnvelope)) {
                throw new IllegalStateException("Expected ControlMessage or RecordEnvelope.");
            }
            RecordEnvelope recordEnvelope = (RecordEnvelope) streamEntity;
            return new SingleAsync(recordEnvelope, convertRecordAsync(convertSchema, recordEnvelope.getRecord(), workUnitState));
        }, false, propAsInt), convertSchema);
    }
}
