/*
 * Decompiled with CFR 0.152.
 */
package org.logstash.config.ir.compiler;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyHash;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.internal.runtime.methods.DynamicMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.plugins.factory.ContextualizerExt;

public final class OutputStrategyExt {
    private OutputStrategyExt() {
    }

    @JRubyClass(name={"Shared"}, parent="SimpleAbstractStrategy")
    public static final class SharedOutputStrategyExt
    extends SimpleAbstractOutputStrategyExt {
        private static final long serialVersionUID = 1L;

        public SharedOutputStrategyExt(Ruby runtime, RubyClass metaClass) {
            super(runtime, metaClass);
        }

        @Override
        protected IRubyObject output(ThreadContext context, IRubyObject events) {
            return this.doOutput(context, events);
        }
    }

    @JRubyClass(name={"Single"}, parent="SimpleAbstractStrategy")
    public static final class SingleOutputStrategyExt
    extends SimpleAbstractOutputStrategyExt {
        private static final long serialVersionUID = 1L;

        public SingleOutputStrategyExt(Ruby runtime, RubyClass metaClass) {
            super(runtime, metaClass);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected IRubyObject output(ThreadContext context, IRubyObject events) {
            SingleOutputStrategyExt singleOutputStrategyExt = this;
            synchronized (singleOutputStrategyExt) {
                return this.doOutput(context, events);
            }
        }
    }

    @JRubyClass(name={"SimpleAbstractStrategy"}, parent="AbstractStrategy")
    public static abstract class SimpleAbstractOutputStrategyExt
    extends AbstractOutputStrategyExt {
        private static final long serialVersionUID = 1L;
        private IRubyObject output;

        protected SimpleAbstractOutputStrategyExt(Ruby runtime, RubyClass metaClass) {
            super(runtime, metaClass);
        }

        @JRubyMethod(required=4)
        public IRubyObject initialize(ThreadContext context, IRubyObject[] args) {
            RubyClass outputClass = (RubyClass)args[0];
            IRubyObject metric = args[1];
            ExecutionContextExt executionContext = (ExecutionContextExt)args[2];
            RubyHash pluginArgs = (RubyHash)args[3];
            this.output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs);
            this.initOutputCallsite(outputClass);
            this.output.callMethod(context, "metric=", metric);
            return this;
        }

        @Override
        protected final IRubyObject close(ThreadContext context) {
            return this.output.callMethod(context, "do_close");
        }

        @Override
        protected final IRubyObject reg(ThreadContext context) {
            return this.output.callMethod(context, "register");
        }

        protected final IRubyObject doOutput(ThreadContext context, IRubyObject events) {
            this.invokeOutput(context, events, this.output);
            return context.nil;
        }
    }

    @JRubyClass(name={"Legacy"}, parent="AbstractStrategy")
    public static final class LegacyOutputStrategyExt
    extends AbstractOutputStrategyExt {
        private static final long serialVersionUID = 1L;
        private BlockingQueue<IRubyObject> workerQueue;
        private IRubyObject workerCount;
        private RubyArray workers;

        public LegacyOutputStrategyExt(Ruby runtime, RubyClass metaClass) {
            super(runtime, metaClass);
        }

        @JRubyMethod(required=4)
        public IRubyObject initialize(ThreadContext context, IRubyObject[] args) {
            RubyClass outputClass = (RubyClass)args[0];
            IRubyObject metric = args[1];
            ExecutionContextExt executionContext = (ExecutionContextExt)args[2];
            RubyHash pluginArgs = (RubyHash)args[3];
            this.workerCount = pluginArgs.op_aref(context, (IRubyObject)context.runtime.newString("workers"));
            if (this.workerCount.isNil()) {
                this.workerCount = RubyFixnum.one((Ruby)context.runtime);
            }
            int count = this.workerCount.convertToInteger().getIntValue();
            this.workerQueue = new ArrayBlockingQueue<IRubyObject>(count);
            this.workers = context.runtime.newArray(count);
            for (int i = 0; i < count; ++i) {
                IRubyObject output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs);
                this.initOutputCallsite(outputClass);
                output.callMethod(context, "metric=", metric);
                this.workers.append(output);
                this.workerQueue.add(output);
            }
            return this;
        }

        @JRubyMethod(name={"worker_count"})
        public IRubyObject workerCount() {
            return this.workerCount;
        }

        @JRubyMethod
        public IRubyObject workers() {
            return this.workers;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected IRubyObject output(ThreadContext context, IRubyObject events) throws InterruptedException {
            IRubyObject worker = this.workerQueue.take();
            try {
                this.invokeOutput(context, events, worker);
                IRubyObject iRubyObject = context.nil;
                return iRubyObject;
            }
            finally {
                this.workerQueue.put(worker);
            }
        }

        @Override
        protected IRubyObject close(ThreadContext context) {
            this.workers.forEach(worker -> ((IRubyObject)worker).callMethod(context, "do_close"));
            return this;
        }

        @Override
        protected IRubyObject reg(ThreadContext context) {
            this.workers.forEach(worker -> ((IRubyObject)worker).callMethod(context, "register"));
            return this;
        }
    }

    @JRubyClass(name={"AbstractStrategy"})
    public static abstract class AbstractOutputStrategyExt
    extends RubyObject {
        private static final long serialVersionUID = 1L;
        private DynamicMethod outputMethod;
        private RubyClass outputClass;

        public AbstractOutputStrategyExt(Ruby runtime, RubyClass metaClass) {
            super(runtime, metaClass);
        }

        @JRubyMethod
        public final IRubyObject register(ThreadContext context) {
            return this.reg(context);
        }

        @JRubyMethod(name={"do_close"})
        public final IRubyObject doClose(ThreadContext context) {
            return this.close(context);
        }

        @JRubyMethod(name={"multi_receive"})
        public final IRubyObject multiReceive(ThreadContext context, IRubyObject events) throws InterruptedException {
            return this.output(context, events);
        }

        protected final void initOutputCallsite(RubyClass outputClass) {
            this.outputMethod = outputClass.searchMethod("multi_receive");
            this.outputClass = outputClass;
        }

        protected final void invokeOutput(ThreadContext context, IRubyObject batch, IRubyObject pluginInstance) {
            this.outputMethod.call(context, pluginInstance, (RubyModule)this.outputClass, "multi_receive", batch);
        }

        protected abstract IRubyObject output(ThreadContext var1, IRubyObject var2) throws InterruptedException;

        protected abstract IRubyObject close(ThreadContext var1);

        protected abstract IRubyObject reg(ThreadContext var1);
    }

    @JRubyClass(name={"OutputDelegatorStrategyRegistry"})
    public static final class OutputStrategyRegistryExt
    extends RubyObject {
        private static final long serialVersionUID = 1L;
        private static OutputStrategyRegistryExt instance;
        private RubyHash map;

        public OutputStrategyRegistryExt(Ruby runtime, RubyClass metaClass) {
            super(runtime, metaClass);
        }

        @JRubyMethod(meta=true)
        public static synchronized OutputStrategyRegistryExt instance(ThreadContext context, IRubyObject recv) {
            if (instance == null) {
                instance = new OutputStrategyRegistryExt(context.runtime, RubyUtil.OUTPUT_STRATEGY_REGISTRY);
                instance.init(context);
            }
            return instance;
        }

        @JRubyMethod(name={"initialize"})
        public IRubyObject init(ThreadContext context) {
            this.map = RubyHash.newHash((Ruby)context.runtime);
            return this;
        }

        @JRubyMethod
        public IRubyObject classes(ThreadContext context) {
            return this.map.values(context);
        }

        @JRubyMethod
        public IRubyObject types() {
            return this.map.keys();
        }

        @JRubyMethod
        public IRubyObject register(ThreadContext context, IRubyObject type, IRubyObject klass) {
            return this.map.op_aset(context, type, klass);
        }

        @JRubyMethod(name={"class_for"})
        public RubyClass classFor(ThreadContext context, IRubyObject type) {
            IRubyObject klass = this.map.op_aref(context, type);
            if (!klass.isTrue()) {
                throw new IllegalArgumentException(String.format("Could not find output delegator strategy of type '%s'. Value strategies: %s", type.asJavaString(), this.map.values(context).stream().map(v -> ((IRubyObject)v).asJavaString()).collect(Collectors.joining(", "))));
            }
            return (RubyClass)klass;
        }
    }
}

