package co.decodable.sdk.pipeline.testing;

import co.decodable.sdk.pipeline.EnvironmentAccess;
import co.decodable.sdk.pipeline.util.Incubating;
import java.lang.System;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Incubating
/* loaded from: input_file:co/decodable/sdk/pipeline/testing/PipelineTestContext.class */
public class PipelineTestContext implements AutoCloseable {
    private static final System.Logger LOGGER = System.getLogger(PipelineTestContext.class.getName());
    private final TestEnvironment testEnvironment;
    private final KafkaProducer<String, String> producer;
    private final Map<String, DecodableStreamImpl> streams;
    private final ExecutorService executorService;

    /* loaded from: input_file:co/decodable/sdk/pipeline/testing/PipelineTestContext$DecodableStreamImpl.class */
    private class DecodableStreamImpl implements DecodableStream<String> {
        private final String streamName;
        private final KafkaConsumer<String, String> consumer;
        private final List<ConsumerRecord<String, String>> consumed = new ArrayList();

        public DecodableStreamImpl(String str, KafkaConsumer<String, String> kafkaConsumer) {
            this.streamName = str;
            this.consumer = kafkaConsumer;
        }

        @Override // co.decodable.sdk.pipeline.testing.DecodableStream
        public void add(StreamRecord<String> streamRecord) {
            try {
                PipelineTestContext.this.producer.send(new ProducerRecord(PipelineTestContext.this.testEnvironment.topicFor(this.streamName), streamRecord.value())).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("Couldn't send record", e);
            }
        }

        @Override // co.decodable.sdk.pipeline.testing.DecodableStream
        public Future<StreamRecord<String>> takeOne() {
            return ((CompletableFuture) take(1)).thenApply(list -> {
                return (StreamRecord) list.get(0);
            });
        }

        @Override // co.decodable.sdk.pipeline.testing.DecodableStream
        public Future<List<StreamRecord<String>>> take(int i) {
            return CompletableFuture.supplyAsync(() -> {
                while (this.consumed.size() < i) {
                    Iterator it = this.consumer.poll(Duration.ofMillis(20L)).iterator();
                    while (it.hasNext()) {
                        this.consumed.add((ConsumerRecord) it.next());
                    }
                }
                List list = (List) this.consumed.subList(0, i).stream().map(consumerRecord -> {
                    return new StreamRecord((String) consumerRecord.value());
                }).collect(Collectors.toList());
                this.consumed.subList(0, i).clear();
                return list;
            }, PipelineTestContext.this.executorService);
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:co/decodable/sdk/pipeline/testing/PipelineTestContext$ThrowingConsumer.class */
    public interface ThrowingConsumer<T> {
        void accept(T t) throws Exception;
    }

    public PipelineTestContext(TestEnvironment testEnvironment) {
        EnvironmentAccess.setEnvironment(testEnvironment);
        this.testEnvironment = testEnvironment;
        this.producer = new KafkaProducer<>(producerProperties(testEnvironment.bootstrapServers()));
        this.streams = new HashMap();
        this.executorService = Executors.newCachedThreadPool();
    }

    private static Properties producerProperties(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    private static Properties consumerProperties(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("group.id", "my-group");
        return properties;
    }

    public DecodableStream<String> stream(String str) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties(this.testEnvironment.bootstrapServers()));
        kafkaConsumer.subscribe(Collections.singleton(this.testEnvironment.topicFor(str)));
        return this.streams.computeIfAbsent(str, str2 -> {
            return new DecodableStreamImpl(str2, kafkaConsumer);
        });
    }

    public void runJobAsync(ThrowingConsumer<String[]> throwingConsumer, String... strArr) throws Exception {
        this.executorService.submit(() -> {
            try {
                throwingConsumer.accept(strArr);
            } catch (InterruptedException e) {
                LOGGER.log(System.Logger.Level.INFO, "Job aborted");
            } catch (Exception e2) {
                LOGGER.log(System.Logger.Level.ERROR, "Job failed", e2);
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            try {
                this.producer.close();
                this.executorService.shutdownNow();
                this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS);
                Iterator<DecodableStreamImpl> it = this.streams.values().iterator();
                while (it.hasNext()) {
                    it.next().consumer.close();
                }
            } catch (Exception e) {
                throw new RuntimeException("Couldn't close testing context", e);
            }
        } finally {
            EnvironmentAccess.resetEnvironment();
        }
    }
}
