/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.tools.LineMessageReader;
import org.apache.kafka.tools.ToolsUtils;
import org.apache.kafka.tools.api.RecordReader;

public class ConsoleProducer {
    public static void main(String[] args) {
        ConsoleProducer consoleProducer = new ConsoleProducer();
        consoleProducer.start(args);
    }

    void start(String[] args) {
        try {
            ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
            RecordReader reader = this.messageReader(opts);
            KafkaProducer producer = new KafkaProducer(opts.producerProps());
            Exit.addShutdownHook((String)"producer-shutdown-hook", () -> ((KafkaProducer)producer).close());
            this.loopReader((Producer<byte[], byte[]>)producer, reader, opts.sync());
        }
        catch (OptionException e) {
            System.err.println(e.getMessage());
            Exit.exit((int)1);
        }
        catch (Exception e) {
            e.printStackTrace();
            Exit.exit((int)1);
        }
        Exit.exit((int)0);
    }

    RecordReader messageReader(ConsoleProducerOptions opts) throws Exception {
        Object objReader = Class.forName(opts.readerClass()).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        if (objReader instanceof RecordReader) {
            RecordReader reader = (RecordReader)objReader;
            reader.configure(opts.readerProps());
            return reader;
        }
        throw new IllegalArgumentException("The reader must implement " + RecordReader.class.getName() + " interface");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void loopReader(Producer<byte[], byte[]> producer, RecordReader reader, boolean sync) throws Exception {
        Iterator iter = reader.readRecords(System.in);
        try {
            while (iter.hasNext()) {
                this.send(producer, (ProducerRecord<byte[], byte[]>)((ProducerRecord)iter.next()), sync);
            }
        }
        finally {
            reader.close();
        }
    }

    private void send(Producer<byte[], byte[]> producer, ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
        if (sync) {
            producer.send(record).get();
        } else {
            producer.send(record, (Callback)new ErrorLoggingCallback(record.topic(), (byte[])record.key(), (byte[])record.value(), false));
        }
    }

    static final class ConsoleProducerOptions
    extends CommandDefaultOptions {
        private final OptionSpec<String> topicOpt;
        private final OptionSpec<String> bootstrapServerOpt;
        private final OptionSpec<Void> syncOpt;
        private final OptionSpec<String> compressionCodecOpt;
        private final OptionSpec<Integer> batchSizeOpt;
        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
        private final OptionSpec<Long> retryBackoffMsOpt;
        private final OptionSpec<Long> sendTimeoutOpt;
        private final OptionSpec<String> requestRequiredAcksOpt;
        private final OptionSpec<Integer> requestTimeoutMsOpt;
        private final OptionSpec<Long> metadataExpiryMsOpt;
        private final OptionSpec<Long> maxBlockMsOpt;
        private final OptionSpec<Long> maxMemoryBytesOpt;
        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
        private final OptionSpec<String> messageReaderOpt;
        private final OptionSpec<Integer> socketBufferSizeOpt;
        private final OptionSpec<String> propertyOpt;
        private final OptionSpec<String> readerConfigOpt;
        private final OptionSpec<String> producerPropertyOpt;
        private final OptionSpec<String> producerConfigOpt;

        public ConsoleProducerOptions(String[] args) {
            super(args);
            this.topicOpt = this.parser.accepts("topic", "REQUIRED: The topic name to produce messages to.").withRequiredArg().describedAs("topic").ofType(String.class);
            this.bootstrapServerOpt = this.parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.").withRequiredArg().describedAs("server to connect to").ofType(String.class);
            this.syncOpt = this.parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.");
            this.compressionCodecOpt = this.parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'.If specified without value, then it defaults to 'gzip'").withOptionalArg().describedAs("compression-codec").ofType(String.class);
            this.batchSizeOpt = this.parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. please note that this option will be replaced if max-partition-memory-bytes is also set").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)16384, (Object[])new Integer[0]);
            this.messageSendMaxRetriesOpt = this.parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. This is the option to control `retries` in producer configs.").withRequiredArg().ofType(Integer.class).defaultsTo((Object)3, (Object[])new Integer[0]);
            this.retryBackoffMsOpt = this.parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. This is the option to control `retry.backoff.ms` in producer configs.").withRequiredArg().ofType(Long.class).defaultsTo((Object)100L, (Object[])new Long[0]);
            this.sendTimeoutOpt = this.parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. This is the option to control `linger.ms` in producer configs.").withRequiredArg().describedAs("timeout_ms").ofType(Long.class).defaultsTo((Object)1000L, (Object[])new Long[0]);
            this.requestRequiredAcksOpt = this.parser.accepts("request-required-acks", "The required `acks` of the producer requests").withRequiredArg().describedAs("request required acks").ofType(String.class).defaultsTo((Object)"-1", (Object[])new String[0]);
            this.requestTimeoutMsOpt = this.parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.").withRequiredArg().describedAs("request timeout ms").ofType(Integer.class).defaultsTo((Object)1500, (Object[])new Integer[0]);
            this.metadataExpiryMsOpt = this.parser.accepts("metadata-expiry-ms", "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. This is the option to control `metadata.max.age.ms` in producer configs.").withRequiredArg().describedAs("metadata expiration interval").ofType(Long.class).defaultsTo((Object)300000L, (Object[])new Long[0]);
            this.maxBlockMsOpt = this.parser.accepts("max-block-ms", "The max time that the producer will block for during a send request.").withRequiredArg().describedAs("max block on send").ofType(Long.class).defaultsTo((Object)60000L, (Object[])new Long[0]);
            this.maxMemoryBytesOpt = this.parser.accepts("max-memory-bytes", "The total memory used by the producer to buffer records waiting to be sent to the server. This is the option to control `buffer.memory` in producer configs.").withRequiredArg().describedAs("total memory in bytes").ofType(Long.class).defaultsTo((Object)0x2000000L, (Object[])new Long[0]);
            this.maxPartitionMemoryBytesOpt = this.parser.accepts("max-partition-memory-bytes", "The buffer size allocated for a partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. This is the option to control `batch.size` in producer configs.").withRequiredArg().describedAs("memory in bytes per partition").ofType(Integer.class).defaultsTo((Object)16384, (Object[])new Integer[0]);
            this.messageReaderOpt = this.parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. By default each line is read as a separate message.").withRequiredArg().describedAs("reader_class").ofType(String.class).defaultsTo((Object)LineMessageReader.class.getName(), (Object[])new String[0]);
            this.socketBufferSizeOpt = this.parser.accepts("socket-buffer-size", "The size of the tcp RECV size. This is the option to control `send.buffer.bytes` in producer configs.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)102400, (Object[])new Integer[0]);
            this.propertyOpt = this.parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader.\nDefault properties include:\n parse.key=false\n parse.headers=false\n ignore.error=false\n key.separator=\\t\n headers.delimiter=\\t\n headers.separator=,\n headers.key.separator=:\n null.marker=   When set, any fields (key, value and headers) equal to this will be replaced by null\nDefault parsing pattern when:\n parse.headers=true and parse.key=true:\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"\n parse.key=true:\n  \"key\\tvalue\"\n parse.headers=true:\n  \"h1:v1,h2:v2...\\tvalue\"").withRequiredArg().describedAs("prop").ofType(String.class);
            this.readerConfigOpt = this.parser.accepts("reader-config", "Config properties file for the message reader. Note that " + String.valueOf(this.propertyOpt) + " takes precedence over this config.").withRequiredArg().describedAs("config file").ofType(String.class);
            this.producerPropertyOpt = this.parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ").withRequiredArg().describedAs("producer_prop").ofType(String.class);
            this.producerConfigOpt = this.parser.accepts("producer.config", "Producer config properties file. Note that " + String.valueOf(this.producerPropertyOpt) + " takes precedence over this config.").withRequiredArg().describedAs("config file").ofType(String.class);
            try {
                this.options = this.parser.parse(args);
            }
            catch (OptionException e) {
                CommandLineUtils.printUsageAndExit((OptionParser)this.parser, (String)e.getMessage());
            }
            this.checkArgs();
        }

        void checkArgs() {
            CommandLineUtils.maybePrintHelpOrVersion((CommandDefaultOptions)this, (String)"This tool helps to read data from standard input and publish it to Kafka.");
            CommandLineUtils.checkRequiredArgs((OptionParser)this.parser, (OptionSet)this.options, (OptionSpec[])new OptionSpec[]{this.topicOpt});
            try {
                ToolsUtils.validateBootstrapServer((String)this.options.valueOf(this.bootstrapServerOpt));
            }
            catch (IllegalArgumentException e) {
                CommandLineUtils.printUsageAndExit((OptionParser)this.parser, (String)e.getMessage());
            }
        }

        boolean sync() {
            return this.options.has(this.syncOpt);
        }

        String compressionCodec() {
            if (this.options.has(this.compressionCodecOpt)) {
                String codecOptValue = (String)this.options.valueOf(this.compressionCodecOpt);
                return codecOptValue == null || codecOptValue.isEmpty() ? CompressionType.GZIP.name : codecOptValue;
            }
            return CompressionType.NONE.name;
        }

        String readerClass() {
            return (String)this.options.valueOf(this.messageReaderOpt);
        }

        Map<String, String> readerProps() throws IOException {
            HashMap<String, String> properties = new HashMap<String, String>();
            if (this.options.has(this.readerConfigOpt)) {
                properties.putAll(Utils.propsToStringMap((Properties)Utils.loadProps((String)((String)this.options.valueOf(this.readerConfigOpt)))));
            }
            properties.put("topic", (String)this.options.valueOf(this.topicOpt));
            properties.putAll(Utils.propsToStringMap((Properties)CommandLineUtils.parseKeyValueArgs((List)this.options.valuesOf(this.propertyOpt))));
            return properties;
        }

        Properties producerProps() throws IOException {
            Properties props = new Properties();
            if (this.options.has(this.producerConfigOpt)) {
                props.putAll((Map<?, ?>)Utils.loadProps((String)((String)this.options.valueOf(this.producerConfigOpt))));
            }
            props.putAll((Map<?, ?>)CommandLineUtils.parseKeyValueArgs((List)this.options.valuesOf(this.producerPropertyOpt)));
            props.put("bootstrap.servers", this.options.valueOf(this.bootstrapServerOpt));
            props.put("compression.type", this.compressionCodec());
            if (props.getProperty("client.id") == null) {
                props.put("client.id", "console-producer");
            }
            props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"linger.ms", (OptionSet)this.options, this.sendTimeoutOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"acks", (OptionSet)this.options, this.requestRequiredAcksOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"request.timeout.ms", (OptionSet)this.options, this.requestTimeoutMsOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"retries", (OptionSet)this.options, this.messageSendMaxRetriesOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"retry.backoff.ms", (OptionSet)this.options, this.retryBackoffMsOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"send.buffer.bytes", (OptionSet)this.options, this.socketBufferSizeOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"buffer.memory", (OptionSet)this.options, this.maxMemoryBytesOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"batch.size", (OptionSet)this.options, this.batchSizeOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"batch.size", (OptionSet)this.options, this.maxPartitionMemoryBytesOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"metadata.max.age.ms", (OptionSet)this.options, this.metadataExpiryMsOpt);
            CommandLineUtils.maybeMergeOptions((Properties)props, (String)"max.block.ms", (OptionSet)this.options, this.maxBlockMsOpt);
            return props;
        }
    }
}

