#!/usr/bin/env oo-ruby
#--
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#++

require 'rubygems'
require 'openshift-origin-node/utils/node_logger'
require 'openshift-origin-node/utils/application_state'
require 'openshift-origin-node/model/application_container'
require 'openshift-origin-node/model/frontend_httpd'
require 'openshift-origin-common/utils/file_needs_sync'
require 'syslog'

module OpenShift
  module Runtime
    class AdminGearsControl
      @@RED = "\033[31m"
      @@GREEN = "\033[32m"
      @@NORMAL = "\033[0m"

      @@DEFAULT_SLOTS = 5
      @@DEFAULT_GEAR_TIMEOUT = 200

      def initialize(container_uuids=nil)
        @uuids = container_uuids

        @nslots = @@DEFAULT_SLOTS

        @gear_timeout = @@DEFAULT_GEAR_TIMEOUT

        if container_uuids.nil?
          NodeLogger.logger.debug("AdminGearsControl: initialized for all gears with #{@nslots} parallel")
        else
          NodeLogger.logger.debug("AdminGearsControl: initialized for gear(s) #{container_uuids.join(' ')}")
        end

        @readers_mutex = Mutex.new
        @readers = Hash.new

        @generator = nil
      end

      def start
        p_runner do |gear|
          syslog_pass_fail("Started", "Failed to start", gear) do
            output_pass_fail("Starting", gear) do
              gear.start_gear
            end
          end
        end
      end

      def stop(force=false)
        p_runner(false) do |gear|
          syslog_pass_fail("Stopped", "Failed to stop", gear) do
            output_pass_fail("Stopping", gear) do
              options={ :user_initiated => false }
              if force
                options[:force]=true
                options[:term_delay]=10
              end
              gear.stop_gear(options)
            end
          end
        end
      end

      def restart
        p_runner(false) do |gear|
          syslog_pass_fail("Restarted", "Failed to restart", gear) do
            output_pass_fail("Restarting", gear) do
              out = ""
              out << gear.stop_gear
              out << gear.start_gear
              out
            end
          end
        end
      end

      def status
        p_runner(false) do |gear|
          $stdout.puts("Checking application #{gear.container_name} (#{gear.uuid}) status:")
          $stdout.puts("-----------------------------------------------")
          if gear.stop_lock?
              $stdout.puts("Gear #{gear.uuid} is locked.")
              $stdout.puts("")
          end
          begin
            gear.cartridge_model.each_cartridge do |cart|
              $stdout.puts("Cartridge: #{cart.name}...")
              output = gear.status(cart)
              output.gsub!(/^ATTR:.*$/, '')
              output.gsub!(/^CLIENT_RESULT:\s+/, '')
              output.strip!
              $stdout.puts(output)
              $stdout.puts("")
            end
          rescue => e
            $stderr.puts("Gear #{gear.container_name} Exception: #{e}")
            $stderr.puts("Gear #{gear.container_name} Backtrace: #{e.backtrace}")
          end
          $stdout.puts("")
        end
      end

      def idle
        p_runner(false) do |gear|
          syslog_pass_fail("Idled", "Failed to idle", gear) do
            output_pass_fail("Idling", gear) do
              gear.idle_gear
            end
          end
        end
      end

      def unidle
        p_runner(false) do |gear|
          syslog_pass_fail("Restored", "Failed to restore", gear) do
            output_pass_fail("Unidling", gear) do
              gear.unidle_gear
            end
          end
        end
      end

      def p_runner(skip_stopped=true)
        parallelize do |p|
          gears(skip_stopped).each do |gear|
            p.call do
              if block_given?
                yield(gear)
              end
            end
          end
        end
      end

      def gear_uuids(skip_stopped=true)
        Enumerator.new do |yielder|
          gears(skip_stopped).each do |gear|
            yielder.yield(gear.uuid)
          end
        end
      end


      # Public: Enumerate all non-stopped gears.
      def gears(skip_stopped=true)
        Enumerator.new do |yielder|
          gear_set = []
          if @uuids
            @uuids.each do |uuid|
              gear = ApplicationContainer.from_uuid(uuid)
              if skip_stopped & gear.stop_lock?
                raise ArgumentError, "Gear is locked: #{uuid}"
              end
              gear_set << gear
            end
          else
            gear_set = ApplicationContainer.all(nil, false)
          end
          gear_set.each do |gear|
            begin
              if not (skip_stopped & gear.stop_lock?)
                yielder.yield(gear)
              end
            rescue => e
              NodeLogger.logger.error("Gear evaluation failed for: #{gear.uuid}")
              NodeLogger.logger.error("Exception: #{e}")
              NodeLogger.logger.error("#{e.backtrace}")
            end
          end
        end
      end

      private

      # Private: yield a function which can be used to parallelize gear
      # method calls.
      def parallelize

        # Re-deliver SIGUSR1 to all child processes.
        Signal.trap("USR1") do
          @readers_mutex.synchronize do
            @readers.each_pair do |fileno, child_data|
              if not child_data[:pid].nil?
                begin
                  Process.kill("USR1", child_data[:pid])
                rescue
                end
              end
            end
          end
        end

        @generator = Thread.new do
          if block_given?
            yield(gen_background_task)
          end
        end

        rv = collect_output
        @generator.join
        rv
      end

      # Private: Syslog status (passed/failed)
      # Intended to wrap output_pass_fail and use its return code.
      def syslog_pass_fail(pass_msg, fail_msg, gear)
        rc = 0
        if block_given?
          rc = yield
        end

        Syslog.open('oo-admin-ctl-gears', 0, Syslog::LOG_LOCAL0)
        if rc == 0
          Syslog.log(Syslog::LOG_NOTICE, "#{pass_msg}: #{gear.uuid}")
        else
          Syslog.log(Syslog::LOG_WARNING, "#{fail_msg}: #{gear.uuid}")
        end
        Syslog.close()

        rc
      end

      # Private: Respond with pass/fail status depending on whether an exception was raised.
      def output_pass_fail(message, gear)
        rc = 0
        @timeout_handlers << lambda { $stderr.puts "Timeout #{message} gear #{gear}...  [ #{@RED}FAILED#{@NORM} ]" }
        begin
          $stdout.write("#{message} gear #{gear.uuid}... ")
          if block_given?
            output = yield
          end
          NodeLogger.logger.info("#{message} gear #{gear.uuid}... [ OK ]")
          $stdout.write("[ #{@GREEN}OK#{@NORM} ]\n")
        rescue => e
          NodeLogger.logger.debug("Gear: #{gear.uuid} Exception #{e.inspect}")
          NodeLogger.logger.debug("Gear: #{gear.uuid} Backtrace #{e.backtrace}")
          NodeLogger.logger.error("#{message} gear #{gear.uuid}: [ FAIL ]")
          NodeLogger.logger.error("#{message} gear #{gear.uuid}: Error: #{e}")
          $stdout.write("[ #{@RED}FAILED#{@NORM} ]\n")
          e.to_s.lines.each do |el|
            $stdout.write("      #{el}\n")
          end
          rc=254
        end
        rc
      end

      # Private: Generate a backgrounded task
      def gen_background_task
        lambda do |&block|

          # Wait for a free slot before continuing
          while true
            nchildren = 0
            @readers_mutex.synchronize do
              nchildren = @readers.length
            end
            break if nchildren < @nslots
            Thread.stop
          end

          reader, writer = IO::pipe

          pid = Process.fork do
            reader.close
            rc = 0
            begin
              writer.sync
              $stdin.reopen("/dev/null", 'r')
              $stdout.reopen(writer)
              $stdout.sync
              $stderr.reopen(writer)
              $stderr.sync

              # Around 1/16 of the forked processes deadlock somewhere
              # if we don't close FDs and reinitialize logging.
              ObjectSpace.each_object(IO) do |i|
                next if i.closed?
                next if [$stdin, $stdout, $stderr, writer].map { |f| f.fileno }.include?(i.fileno)
                i.close
              end
              NodeLogger.logger.reinitialize

              # Signal USR1 causes us to report a timeout error and detach
              @timeout_handlers = []
              Signal.trap("USR1") do
                NodeLogger.logger.debug("Received USR1, detaching from main process.")
                @timeout_handlers.each do |block|
                  block.call
                end
                $stdout.reopen("/dev/null", "a")
                $stderr.reopen("/dev/null", "a")
                writer.close
                Process.setsid
              end

              if not block.nil?
                rc = block.call
              end

            ensure
              exit(rc.to_i)
            end
          end

          writer.close

          # Add to the listener list
          @readers_mutex.synchronize do
            @readers[reader.fileno] = {:pid => pid, :reader => reader, :buf => "", :started => Time.now }
          end

        end
      end

      # Private: Collect output and status for children and send to stdout.
      #
      # Note: This will buffer stdout indefinitely to avoid slowing down
      # the children.
      #
      # Note: to allow for new processes and a changing descriptor set,
      # the set of file descriptors are re-checked every 0.1 seconds.
      #
      def collect_output
        outbuf = ""
        retcode = 0

        begin
          readers = []
          @readers_mutex.synchronize do
            @readers.each_pair do |k, v|
              readers << v[:reader]
            end
          end

          writers=[]
          if not outbuf.empty?
            writers=[$stdout]
          end

          fds = select(readers, writers, nil, 0.1)
          next if fds.nil?
          # Process waiting pipe reads
          fds[0].uniq.each do |fd|
            begin
              buf = fd.read_nonblock(32768)
              @readers_mutex.synchronize do
                @readers[fd.fileno][:buf] << buf
              end
            rescue IO::WaitReadable
            rescue EOFError
              fileno = fd.fileno
              @readers_mutex.synchronize do
                outbuf << @readers[fileno][:buf]
                @readers[fileno][:reader].close
                @readers.delete(fileno)
              end
              begin
                @generator.wakeup
              rescue ThreadError
              end
            end
          end

          # Process output to stdout
          fds[1].each do |fd|
            begin
              outbytes = fd.write_nonblock(outbuf)
              outbuf = outbuf[outbytes..-1]
            rescue IO::WaitWritable
            end
          end

          # Process exited children
          begin
            while true
              cpid, status = Process.waitpid2(-1, Process::WNOHANG)
              break if cpid.nil?
              retcode |= status.exitstatus
              NodeLogger.logger.debug("Finished: #{cpid} Status: #{status.exitstatus}")
              @readers_mutex.synchronize do
                @readers.each_pair do |k, v|
                  v.delete(:pid) if v[:pid] == cpid
                end
              end
            end
          rescue Errno::ECHILD
          end

          # Process timeouts
          @readers_mutex.synchronize do
            @readers.each_pair do |k, v|
              if not v[:pid].nil?
                if (Time.now - v[:started]) > @gear_timeout
                  begin
                    Process.kill("USR1", v[:pid])
                  rescue
                  end
                  v.delete(:pid)
                end
              end
            end
          end

        end until (not @generator.alive?) and @readers.empty?

        $stdout.write(outbuf)

        retcode
      end

    end
  end
end


$lockfile = "/var/lock/subsys/openshift-gears"

def lock_if_good
  if block_given?
    r = yield
    if r.to_i == 0
      File.open($lockfile, 'w') {}
    end
  end
  r
end

def unlock_if_good
  if block_given?
    r = yield
    if r.to_i == 0
      begin
        File.unlink($lockfile)
      rescue Errno::ENOENT
      end
    end
  end
  r
end


exitval = 0
begin
  case ARGV[0]
  when 'startall'
    cpid = fork do
      Dir.chdir('/')
      $stdin.reopen('/dev/null', 'r')
      $stderr.reopen($stdout)            
      ObjectSpace.each_object(IO) do |i|
        next if i.closed?
        next if [$stdin, $stdout, $stderr].map { |f| f.fileno }.include?(i.fileno)
        i.close
      end
      Process.setsid
      OpenShift::Runtime::NodeLogger.logger.reinitialize
      exitval = lock_if_good do
        OpenShift::Runtime::AdminGearsControl.new.start
      end
      exit(exitval)
    end
    OpenShift::Runtime::NodeLogger.logger.info("Background start initiated - process id = #{cpid}")
    $stdout.puts "Background start initiated - process id = #{cpid}"
    $stdout.puts "Check /var/log/openshift/node/platform.log for more details."
    $stdout.puts
    $stdout.puts "Note: In the future, if you wish to start the OpenShift services in the"
    $stdout.puts "      foreground (waited), use:  service openshift-gears waited-start"
    $stdout.flush
    exit!(0)

  when 'stopall'
    exitval = unlock_if_good {
      OpenShift::Runtime::AdminGearsControl.new.stop
    }

  when 'forcestopall'
    exitval = unlock_if_good {
      OpenShift::Runtime::AdminGearsControl.new.stop(true)
    }

  when 'restartall'
    exitval = lock_if_good {
      OpenShift::Runtime::AdminGearsControl.new.restart
    }

  when 'condrestartall'
    if File.exists?($lockfile)
      exitval = OpenShift::Runtime::AdminGearsControl.new.restart
    end

  when 'waited-startall'
    exitval = lock_if_good {
      OpenShift::Runtime::AdminGearsControl.new.start
    }

  when 'status'
    $stdout.puts("Checking OpenShift Services: ")
    $stdout.puts("")
    exitval = OpenShift::Runtime::AdminGearsControl.new.status

  when 'startgear'
    raise "Requires a gear uuid." if ARGV[1..-1].nil?
    exitval = OpenShift::Runtime::AdminGearsControl.new(ARGV[1..-1]).start

  when 'stopgear'
    raise "Requires a gear uuid." if ARGV[1..-1].nil?
    exitval = OpenShift::Runtime::AdminGearsControl.new(ARGV[1..-1]).stop

  when 'forcestopgear'
    raise "Requires a gear uuid." if ARGV[1..-1].nil?
    exitval = OpenShift::Runtime::AdminGearsControl.new(ARGV[1..-1]).stop(true)

  when 'restartgear'
    raise "Requires a gear uuid." if ARGV[1..-1].nil?
    exitval = OpenShift::Runtime::AdminGearsControl.new(ARGV[1..-1]).restart

  when 'statusgear'
    raise "Requires a gear uuid." if ARGV[1..-1].nil?
    exitval = OpenShift::Runtime::AdminGearsControl.new(ARGV[1..-1]).status

  when 'idlegear'
    raise "Requires a gear uuid." if ARGV[1..-1].nil?
    exitval = OpenShift::Runtime::AdminGearsControl.new(ARGV[1..-1]).idle

  when 'unidlegear'
    raise "Requires a gear uuid." if ARGV[1..-1].nil?
    exitval = OpenShift::Runtime::AdminGearsControl.new(ARGV[1..-1]).unidle

  when 'list'
    OpenShift::Runtime::AdminGearsControl.new.gear_uuids(false).each do |uuid|
      $stdout.puts uuid
    end

  when 'listidle'
    OpenShift::Runtime::AdminGearsControl.new.gear_uuids(false).each do |uuid|
      if OpenShift::Runtime::FrontendHttpServer.new(OpenShift::Runtime::ApplicationContainer.from_uuid(uuid)).idle?
        $stdout.puts "#{uuid} is idled"
      end
    end


  else
    raise "Usage: #{$0} {startall|stopall|forcestopall|status|restartall|waited-startall|condrestartall|startgear [uuid]|stopgear [uuid]|forcestopgear [uuid]|restartgear [uuid]|idlegear [gear]|unidlegear [gear]|list|listidle}"
  end

rescue => e
  $stderr.puts(e.to_s)
  exit 1
end

exit(exitval.to_i)
