package org.apache.beam.sdk.extensions.python.transforms;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.UsesPythonExpansionService;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.util.construction.BaseExternalTest;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest.class */
public class RunInferenceTransformTest extends BaseExternalTest {

    /* loaded from: input_file:org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest$KVFn.class */
    static class KVFn extends SimpleFunction<Iterable<Long>, KV<Long, Iterable<Long>>> {
        KVFn() {
        }

        public KV<Long, Iterable<Long>> apply(Iterable<Long> iterable) {
            return KV.of((Long) ((List) iterable).get(0), iterable);
        }
    }

    @Test
    @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
    public void testRunInference() {
        String str = (String) Optional.ofNullable(System.getProperty("semiPersistDir")).orElse("/tmp");
        Schema of = Schema.of(new Schema.Field[]{Schema.Field.of("example", Schema.FieldType.array(Schema.FieldType.INT64)), Schema.Field.of("inference", Schema.FieldType.INT32)});
        PAssert.that(this.testPipeline.apply(Create.of(Arrays.asList(0L, 0L), new Iterable[]{Arrays.asList(1L, 1L)})).setCoder(IterableCoder.of(VarLongCoder.of())).apply(RunInference.of("apache_beam.ml.inference.sklearn_inference.SklearnModelHandlerNumpy", of).withKwarg("model_uri", String.format("%s/staged/sklearn_model", str)).withExpansionService(expansionAddr))).containsInAnyOrder(new Row[]{Row.withSchema(of).addArray(new Object[]{0L, 0L}).addValue(0).build(), Row.withSchema(of).addArray(new Object[]{1L, 1L}).addValue(1).build()});
    }

    private String getModelLoaderScriptWithKVs() {
        return (("from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy\nfrom apache_beam.ml.inference.base import KeyedModelHandler\n") + "def get_model_handler(model_uri):\n") + "  return KeyedModelHandler(SklearnModelHandlerNumpy(model_uri))\n";
    }

    @Test
    @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
    public void testRunInferenceWithKVs() {
        String str = (String) Optional.ofNullable(System.getProperty("semiPersistDir")).orElse("/tmp");
        Schema of = Schema.of(new Schema.Field[]{Schema.Field.of("example", Schema.FieldType.array(Schema.FieldType.INT64)), Schema.Field.of("inference", Schema.FieldType.INT32)});
        PAssert.that(this.testPipeline.apply(Create.of(Arrays.asList(0L, 0L), new Iterable[]{Arrays.asList(1L, 1L)})).apply(MapElements.via(new KVFn())).setCoder(KvCoder.of(VarLongCoder.of(), IterableCoder.of(VarLongCoder.of()))).apply(RunInference.ofKVs(getModelLoaderScriptWithKVs(), of, VarLongCoder.of()).withKwarg("model_uri", String.format("%s/staged/sklearn_model", str)).withExpansionService(expansionAddr)).apply(Values.create())).containsInAnyOrder(new Row[]{Row.withSchema(of).addArray(new Object[]{0L, 0L}).addValue(0).build(), Row.withSchema(of).addArray(new Object[]{1L, 1L}).addValue(1).build()});
    }
}
