/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.PartitionsSpec;
import org.apache.kafka.trogdor.workload.PayloadGenerator;
import org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec;
import org.apache.kafka.trogdor.workload.SequentialPayloadGenerator;
import org.apache.kafka.trogdor.workload.Throttle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RoundTripWorkerBase
implements TaskWorker {
    private static final int THROTTLE_PERIOD_MS = 100;
    private static final int LOG_INTERVAL_MS = 5000;
    private static final int LOG_NUM_MESSAGES = 10;
    private static final Logger log = LoggerFactory.getLogger(RoundTripWorkerBase.class);
    private static final PayloadGenerator KEY_GENERATOR = new SequentialPayloadGenerator(4, 0L);
    private ToReceiveTracker toReceiveTracker;
    protected String id;
    protected RoundTripWorkloadSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Lock lock = new ReentrantLock();
    private final Condition unackedSendsAreZero = this.lock.newCondition();
    private ScheduledExecutorService executor;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    private KafkaProducer<byte[], byte[]> producer;
    private Long unackedSends;
    private ToSendTracker toSendTracker;

    @Override
    public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("RoundTripWorker is already running.");
        }
        log.info("{}: Activating RoundTripWorker.", (Object)this.id);
        this.executor = Executors.newScheduledThreadPool(3, ThreadUtils.createThreadFactory((String)"RoundTripWorker%d", (boolean)false));
        this.status = status;
        this.doneFuture = doneFuture;
        this.producer = null;
        this.unackedSends = this.spec.maxMessages();
        this.executor.submit(new Prepare());
    }

    protected abstract void initializeConsumer(HashSet<TopicPartition> var1);

    protected abstract ConsumerRecords<byte[], byte[]> fetchRecords(Duration var1);

    protected abstract void shutdownConsumer();

    @Override
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("RoundTripWorker is not running.");
        }
        log.info("{}: Deactivating RoundTripWorker.", (Object)this.id);
        this.doneFuture.complete((Object)"");
        this.executor.shutdownNow();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
        this.shutdownConsumer();
        Utils.closeQuietly(this.producer, (String)"producer");
        this.producer = null;
        this.unackedSends = null;
        this.executor = null;
        this.doneFuture = null;
        log.info("{}: Deactivated RoundTripWorker.", (Object)this.id);
    }

    class Prepare
    implements Runnable {
        Prepare() {
        }

        @Override
        public void run() {
            try {
                if (RoundTripWorkerBase.this.spec.targetMessagesPerSec() <= 0) {
                    throw new ConfigException("Can't have targetMessagesPerSec <= 0.");
                }
                HashMap<String, NewTopic> newTopics = new HashMap<String, NewTopic>();
                HashSet<TopicPartition> active = new HashSet<TopicPartition>();
                for (Map.Entry<String, PartitionsSpec> entry : RoundTripWorkerBase.this.spec.activeTopics().materialize().entrySet()) {
                    String topicName = entry.getKey();
                    PartitionsSpec partSpec = entry.getValue();
                    newTopics.put(topicName, partSpec.newTopic(topicName));
                    for (Integer partitionNumber : partSpec.partitionNumbers()) {
                        active.add(new TopicPartition(topicName, partitionNumber.intValue()));
                    }
                }
                if (active.isEmpty()) {
                    throw new RuntimeException("You must specify at least one active topic.");
                }
                RoundTripWorkerBase.this.status.update((JsonNode)new TextNode("Creating " + newTopics.keySet().size() + " topic(s)"));
                WorkerUtils.createTopics(log, RoundTripWorkerBase.this.spec.bootstrapServers(), RoundTripWorkerBase.this.spec.commonClientConf(), RoundTripWorkerBase.this.spec.adminClientConf(), newTopics, false);
                RoundTripWorkerBase.this.status.update((JsonNode)new TextNode("Created " + newTopics.keySet().size() + " topic(s)"));
                RoundTripWorkerBase.this.toSendTracker = new ToSendTracker(RoundTripWorkerBase.this.spec.maxMessages());
                RoundTripWorkerBase.this.toReceiveTracker = new ToReceiveTracker();
                RoundTripWorkerBase.this.executor.submit(new ProducerRunnable(active));
                RoundTripWorkerBase.this.executor.submit(new ConsumerRunnable(active));
                RoundTripWorkerBase.this.executor.submit(new StatusUpdater());
                RoundTripWorkerBase.this.executor.scheduleWithFixedDelay(new StatusUpdater(), 30L, 30L, TimeUnit.SECONDS);
            }
            catch (Throwable e) {
                WorkerUtils.abort(log, "Prepare", e, RoundTripWorkerBase.this.doneFuture);
            }
        }
    }

    public static class StatusData {
        private final long totalUniqueSent;
        private final long totalReceived;

        @JsonCreator
        public StatusData(@JsonProperty(value="totalUniqueSent") long totalUniqueSent, @JsonProperty(value="totalReceived") long totalReceived) {
            this.totalUniqueSent = totalUniqueSent;
            this.totalReceived = totalReceived;
        }

        @JsonProperty
        public long totalUniqueSent() {
            return this.totalUniqueSent;
        }

        @JsonProperty
        public long totalReceived() {
            return this.totalReceived;
        }
    }

    public class StatusUpdater
    implements Runnable {
        @Override
        public void run() {
            try {
                this.update();
            }
            catch (Exception e) {
                WorkerUtils.abort(log, "StatusUpdater", e, RoundTripWorkerBase.this.doneFuture);
            }
        }

        StatusData update() {
            StatusData statusData = new StatusData(RoundTripWorkerBase.this.toSendTracker.frontier(), RoundTripWorkerBase.this.toReceiveTracker.totalReceived());
            RoundTripWorkerBase.this.status.update(JsonUtil.JSON_SERDE.valueToTree((Object)statusData));
            return statusData;
        }
    }

    class ConsumerRunnable
    implements Runnable {
        ConsumerRunnable(HashSet<TopicPartition> partitions) {
            RoundTripWorkerBase.this.initializeConsumer(partitions);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
         * Unable to fully structure code
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            block16: {
                uniqueMessagesReceived = 0L;
                messagesReceived = 0L;
                pollInvoked = 0L;
                RoundTripWorkerBase.log.debug("{}: Starting RoundTripWorker#ConsumerRunnable.", (Object)RoundTripWorkerBase.this.id);
                try {
                    lastLogTimeMs = Time.SYSTEM.milliseconds();
lbl7:
                    // 3 sources

                    while (true) {
                        try {
                            ++pollInvoked;
                            records = RoundTripWorkerBase.this.fetchRecords(Duration.ofMillis(50L));
                            for (ConsumerRecord record : records) {
                                messageIndex = ByteBuffer.wrap((byte[])record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
                                ++messagesReceived;
                                if (!RoundTripWorkerBase.this.toReceiveTracker.removePending(messageIndex) || ++uniqueMessagesReceived < RoundTripWorkerBase.this.spec.maxMessages()) continue;
                                RoundTripWorkerBase.this.lock.lock();
                                try {
                                    RoundTripWorkerBase.log.info("{}: Consumer received the full count of {} unique messages.  Waiting for all {} sends to be acked...", new Object[]{RoundTripWorkerBase.this.id, RoundTripWorkerBase.this.spec.maxMessages(), RoundTripWorkerBase.this.unackedSends});
                                    while (RoundTripWorkerBase.this.unackedSends > 0L) {
                                        RoundTripWorkerBase.this.unackedSendsAreZero.await();
                                    }
                                }
                                finally {
                                    RoundTripWorkerBase.this.lock.unlock();
                                }
                                RoundTripWorkerBase.log.info("{}: all sends have been acked.", (Object)RoundTripWorkerBase.this.id);
                                new StatusUpdater().update();
                                RoundTripWorkerBase.this.doneFuture.complete((Object)"");
                                break block16;
                            }
                            ** GOTO lbl-1000
                        }
                        catch (WakeupException e) {
                            RoundTripWorkerBase.log.debug("{}: Consumer got WakeupException", (Object)RoundTripWorkerBase.this.id, (Object)e);
                        }
                        catch (TimeoutException e) {
                            RoundTripWorkerBase.log.debug("{}: Consumer got TimeoutException", (Object)RoundTripWorkerBase.this.id, (Object)e);
                        }
                        break;
                    }
                }
                catch (Throwable e) {
                    WorkerUtils.abort(RoundTripWorkerBase.log, "ConsumerRunnable", e, RoundTripWorkerBase.this.doneFuture);
                    RoundTripWorkerBase.log.info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  messagesReceived = {}; uniqueMessagesReceived = {}.", new Object[]{RoundTripWorkerBase.this.id, pollInvoked, messagesReceived, uniqueMessagesReceived});
                    return;
                }
                catch (Throwable var14_14) {
                    RoundTripWorkerBase.log.info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  messagesReceived = {}; uniqueMessagesReceived = {}.", new Object[]{RoundTripWorkerBase.this.id, pollInvoked, messagesReceived, uniqueMessagesReceived});
                    throw var14_14;
                }
            }
            RoundTripWorkerBase.log.info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  messagesReceived = {}; uniqueMessagesReceived = {}.", new Object[]{RoundTripWorkerBase.this.id, pollInvoked, messagesReceived, uniqueMessagesReceived});
            return;
lbl-1000:
            // 1 sources

            {
                curTimeMs = Time.SYSTEM.milliseconds();
                if (curTimeMs <= lastLogTimeMs + 5000L) ** GOTO lbl7
                RoundTripWorkerBase.this.toReceiveTracker.log();
                lastLogTimeMs = curTimeMs;
                ** continue;
            }
        }
    }

    private class ToReceiveTracker {
        private final TreeSet<Long> pending = new TreeSet();
        private long totalReceived = 0L;

        private ToReceiveTracker() {
        }

        synchronized void addPending(long messageIndex) {
            this.pending.add(messageIndex);
        }

        synchronized boolean removePending(long messageIndex) {
            if (this.pending.remove(messageIndex)) {
                ++this.totalReceived;
                return true;
            }
            return false;
        }

        synchronized long totalReceived() {
            return this.totalReceived;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void log() {
            long numToReceive;
            ArrayList<Long> list = new ArrayList<Long>(10);
            ToReceiveTracker toReceiveTracker = this;
            synchronized (toReceiveTracker) {
                numToReceive = this.pending.size();
                Iterator<Long> iter = this.pending.iterator();
                while (iter.hasNext() && list.size() < 10) {
                    Long i = iter.next();
                    list.add(i);
                }
            }
            log.info("{}: consumer waiting for {} message(s), starting with: {}", new Object[]{RoundTripWorkerBase.this.id, numToReceive, list.stream().map(Object::toString).collect(Collectors.joining(", "))});
        }
    }

    class ProducerRunnable
    implements Runnable {
        private final HashSet<TopicPartition> partitions;
        private final Throttle throttle;

        ProducerRunnable(HashSet<TopicPartition> partitions) {
            this.partitions = partitions;
            Properties props = new Properties();
            props.put("bootstrap.servers", RoundTripWorkerBase.this.spec.bootstrapServers());
            props.put("batch.size", (Object)16384);
            props.put("buffer.memory", (Object)65536L);
            props.put("max.block.ms", (Object)1000L);
            props.put("client.id", "producer." + RoundTripWorkerBase.this.id);
            props.put("acks", "all");
            props.put("request.timeout.ms", (Object)105000);
            WorkerUtils.addConfigsToProperties(props, RoundTripWorkerBase.this.spec.commonClientConf(), RoundTripWorkerBase.this.spec.producerConf());
            RoundTripWorkerBase.this.producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
            int perPeriod = WorkerUtils.perSecToPerPeriod(RoundTripWorkerBase.this.spec.targetMessagesPerSec(), 100L);
            this.throttle = new Throttle(perPeriod, 100);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        @Override
        public void run() {
            ToSendTrackerResult result;
            long messagesSent = 0L;
            long uniqueMessagesSent = 0L;
            log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", (Object)RoundTripWorkerBase.this.id);
            Iterator<TopicPartition> iter = this.partitions.iterator();
            while ((result = RoundTripWorkerBase.this.toSendTracker.next()) != null) {
                this.throttle.increment();
                long messageIndex = result.index;
                if (result.firstSend) {
                    RoundTripWorkerBase.this.toReceiveTracker.addPending(messageIndex);
                    ++uniqueMessagesSent;
                }
                ++messagesSent;
                if (!iter.hasNext()) {
                    iter = this.partitions.iterator();
                }
                TopicPartition partition = iter.next();
                ProducerRecord record = new ProducerRecord(partition.topic(), Integer.valueOf(partition.partition()), (Object)KEY_GENERATOR.generate(messageIndex), (Object)RoundTripWorkerBase.this.spec.valueGenerator().generate(messageIndex));
                RoundTripWorkerBase.this.producer.send(record, (metadata, exception) -> {
                    if (exception == null) {
                        RoundTripWorkerBase.this.lock.lock();
                        try {
                            RoundTripWorkerBase.this.unackedSends = RoundTripWorkerBase.this.unackedSends - 1L;
                            if (RoundTripWorkerBase.this.unackedSends > 0L) return;
                            RoundTripWorkerBase.this.unackedSendsAreZero.signalAll();
                            return;
                        }
                        finally {
                            RoundTripWorkerBase.this.lock.unlock();
                        }
                    } else {
                        log.info("{}: Got exception when sending message {}: {}", new Object[]{RoundTripWorkerBase.this.id, messageIndex, exception.getMessage()});
                        RoundTripWorkerBase.this.toSendTracker.addFailed(messageIndex);
                    }
                });
            }
            RoundTripWorkerBase.this.lock.lock();
            try {
                log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; ackedSends={}/{}.", new Object[]{RoundTripWorkerBase.this.id, messagesSent, uniqueMessagesSent, RoundTripWorkerBase.this.spec.maxMessages() - RoundTripWorkerBase.this.unackedSends, RoundTripWorkerBase.this.spec.maxMessages()});
            }
            finally {
                RoundTripWorkerBase.this.lock.unlock();
            }
            catch (Throwable e) {
                try {
                    WorkerUtils.abort(log, "ProducerRunnable", e, RoundTripWorkerBase.this.doneFuture);
                    RoundTripWorkerBase.this.lock.lock();
                }
                catch (Throwable throwable) {
                    RoundTripWorkerBase.this.lock.lock();
                    try {
                        log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; ackedSends={}/{}.", new Object[]{RoundTripWorkerBase.this.id, messagesSent, uniqueMessagesSent, RoundTripWorkerBase.this.spec.maxMessages() - RoundTripWorkerBase.this.unackedSends, RoundTripWorkerBase.this.spec.maxMessages()});
                    }
                    finally {
                        RoundTripWorkerBase.this.lock.unlock();
                    }
                    throw throwable;
                }
                try {
                    log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; ackedSends={}/{}.", new Object[]{RoundTripWorkerBase.this.id, messagesSent, uniqueMessagesSent, RoundTripWorkerBase.this.spec.maxMessages() - RoundTripWorkerBase.this.unackedSends, RoundTripWorkerBase.this.spec.maxMessages()});
                }
                finally {
                    RoundTripWorkerBase.this.lock.unlock();
                }
            }
        }
    }

    private static class ToSendTracker {
        private final long maxMessages;
        private final List<Long> failed = new ArrayList<Long>();
        private long frontier = 0L;

        ToSendTracker(long maxMessages) {
            this.maxMessages = maxMessages;
        }

        synchronized void addFailed(long index) {
            this.failed.add(index);
        }

        synchronized long frontier() {
            return this.frontier;
        }

        synchronized ToSendTrackerResult next() {
            if (this.failed.isEmpty()) {
                if (this.frontier >= this.maxMessages) {
                    return null;
                }
                return new ToSendTrackerResult(this.frontier++, true);
            }
            return new ToSendTrackerResult(this.failed.remove(0), false);
        }
    }

    private static class ToSendTrackerResult {
        final long index;
        final boolean firstSend;

        ToSendTrackerResult(long index, boolean firstSend) {
            this.index = index;
            this.firstSend = firstSend;
        }
    }
}

