#!/usr/bin/env oo-ruby
#--
# Copyright 2013-2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#++

require 'rubygems'
require 'forwardable'
require 'open4'
require 'parseconfig'
require 'syslog'
require 'thread'
require 'date'
require 'openshift-origin-node'
require 'openshift-origin-node/utils/environ'
require 'openshift-origin-node/utils/node_logger'
require 'openshift-origin-node/utils/shell_exec'
require 'openshift-origin-node/model/watchman/watchman_plugin'

GEAR_BASE_DIR       = '/var/lib/openshift'
GEAR_GECOS          = 'OpenShift guest'
OPENSHIFT_CONF_DIR  = '/etc/openshift'
WATCHMAN_PLUGIN_DIR = '/etc/openshift/watchman/plugins.d'
OO_ADMIN_CTL_GEARS  = '/usr/sbin/oo-admin-ctl-gears'
OP_TIMEOUT          = 300

module OpenShift
  module Runtime

    # Provides framework for monitoring gears on node.  Plugins provide the actual business logic.
    #
    # @!attribute [r] logger
    #   @return [NodeLogger] logger instance being used
    # @!attribute [r] config
    #   @return [Config] elements from node.conf
    # @!attribute [r] gears
    #   @return [CachedGears] collection of running gears on node
    # @!attribute [r] incidents
    #   @return [Hash<String, DateTime>] record last restart for gear by uuid
    # @!attribute [r] epoch
    #   @return [DateTime] timestamp of when Watchman was started
    #
    # @!attribute [rw] retry_delay
    #  @return [Rational] number of seconds to wait before accepting another gear restart. Default: 5 minutes
    # @!attrubyte [rw] retry_period
    #  @return [Rational] number of seconds to wait before resetting retries. Default: 8 hours
    # @!attribute [rw] gear_retries
    #  @return [Integer] number of restarts to attempt before waiting +retry_period+. Default: 3
    class Watchman

      attr_reader :logger, :config, :gears, :incidents, :epoch
      attr_accessor :retry_delay, :gear_retries, :retry_period

      # @param config [Config] elements from node.conf
      # @param gears [CachedGears] collection of running gears on node
      # @period_seconds [Integer] amount of time to wait between checks
      def initialize(config, gears, period_seconds = 20, epoch = DateTime.now)
        @config = config
        @period = period_seconds
        @gears  = gears
        @epoch  = epoch

        @retry_delay  = 300
        @gear_retries = 3
        @retry_period = 28800
        @logger       = logger(config)
        @incidents    = {}

        Syslog.open(File.basename($0), Syslog::LOG_PID, Syslog::LOG_DAEMON) unless Syslog.opened?

        plugin_dir   = @config.get('WATCHMAN_PLUGIN_DIR', WATCHMAN_PLUGIN_DIR)
        plugin_files = Dir.glob(PathUtils.join(plugin_dir, '*.rb')).find_all { |e| File.file?(e) }

        plugin_files.each { |p| require p }
        @plugins = WatchmanPlugin.repository.collect do |plugin|
          begin
            plugin.new(@config,
                       @logger,
                       @gears,
                       lambda { |op, uuid| cache_incident(op, uuid) })
          rescue Exception => e
            Syslog.warning("Failed to instantiate Watchman Plugin #{plugin}: #{e.message}")
            nil
          end
        end.to_a.compact

        raise "No valid plugins for Watchman found in #{plugin_dir}. Watchman shutting down." if @plugins.empty?
        Syslog.info("Watchman found plugins: #{@plugins.join(', ')}")

        Syslog.info("Starting Watchman => iteration delay: #{period_seconds}s")
      end

      # Override setter to auto-convert to Rational in seconds
      # @param seconds [Fixnum] seconds of delay between restarts
      def retry_delay=(seconds)
        Syslog.info("Watchman retry delay #{seconds}s")
        @retry_delay = seconds.instance_of?(Rational) ? seconds : Rational(seconds, 86400)
      end

      # Override setter to auto-convert to Rational in seconds
      # @param seconds [Fixnum] seconds of delay before resetting retries
      def retry_period=(seconds)
        Syslog.info("Watchman retry period #{seconds}s")
        @retry_period = seconds.instance_of?(Rational) ? seconds : Rational(seconds, 86400)
      end

      def gear_retries=(retries)
        Syslog.info("Watchman gear_retries #{retries}s")
        @gear_retries = retries
      end

      # Get name for gear from gear's environment
      # @param uuid [String] gear login
      # @return gear's name or '(Unknown)'
      def gear_name(uuid)
        file = PathUtils.join(@config.get('GEAR_BASE_DIR', GEAR_BASE_DIR), uuid, '.env', 'OPENSHIFT_GEAR_NAME')
        name = IO.read(file).chomp rescue '(Unknown)'
        name
      end

      # loop through plugins until stopped
      # @param daemon [true, false] Loop more than once?
      def apply(daemon = true)
        iteration = Iteration.new(@epoch, DateTime.now)
        loop do
          iteration = Iteration.new(@epoch, iteration.current_run)

          @plugins.each do |plugin|
            begin
              plugin.apply(iteration)
            rescue SystemExit => e
              msg = "SystemExit raised from Watchman plugin #{plugin}:\n#{e.backtrace.join("\n")}"
              Syslog.notice(msg)
              raise
            rescue Exception => e
              msg = "Unhandled exception (#{e}) from Watchman plugin #{plugin}: #{e.message}"
              Syslog.notice(msg)
              @logger.debug(%Q{#{msg}\n#{e.backtrace.join("\n")}})
            end
          end

          break unless daemon

          sleep (case
                   when @gears.total_count == 0 || @gears.count == 0
                     @period
                   when (@gears.total_count / @gears.count) > 2
                     # Increase the delay if more than half the gears running on node
                     @period * 3
                   else
                     @period
                 end)
        end
      end

      # Instantiate a NodeLogger
      #
      # @param [Config] config elements from node.conf
      # @return [NodeLogger] Properly configured NodeLogger for Watchman to use
      def logger(config)
        logger_profiles = {
            standard: {
                file_config:   'WATCHMAN_CGROUPS_LOG_FILE',
                level_config:  'WATCHMAN_CGROUPS_LOG_LEVEL',
                default_file:  PathUtils.join(File::SEPARATOR, %w{var log openshift node cgroups.log}),
                default_level: Logger::INFO
            },
            trace:    {
                file_config:   'WATCHMAN_CGROUPS_TRACE_LOG_FILE',
                level_config:  'WATCHMAN_CGROUPS_TRACE_LOG_LEVEL',
                default_file:  PathUtils.join(File::SEPARATOR, %w{var log openshift node cgroups-trace.log}),
                default_level: Logger::ERROR
            }
        }

        logger = NodeLogger::SplitTraceLogger.new(config, logger_profiles)

        NodeLogger.set_logger(logger)
        logger
      end

      # Record incident and restart gear if appropriate
      #
      # @param op [Symbol] name of method to call, :restart or :stop
      # @param uuid [String] gear identifier
      def cache_incident(op, uuid)
        if @incidents.has_key?(uuid)
          delta = DateTime.now - @incidents[uuid][:last_updated]

          # Have we tried to restart in the last @retry_delay seconds?
          return if delta < @retry_delay

          # If we haven't attempted to restart in last 8 hours, do so...
          @incidents[uuid][:retries] = (delta > @retry_period) ? 0 : @incidents[uuid][:retries].succ

          # Have we tried to restart more that @gear_retries times?
          return if @incidents[uuid][:retries] > @gear_retries

          # Either repeat death or failed start...
          send(op, uuid, @incidents[uuid][:retries])
          @incidents[uuid][:last_updated] = DateTime.now
        else
          # First death. Restart and move on...
          @incidents[uuid] = {last_updated: DateTime.now, retries: 0}
          send(op, uuid, @incidents[uuid][:retries])
        end
      end

      # Conditionally restart gear
      # Don't restart if application has been marked down
      #
      # @param uuid [String] gear identifier
      # @param retries [Fixnum] number of retries for restart
      def restart(uuid, retries)
        restart_gear = lambda do
          out, err, rc = Utils.oo_spawn(%Q{#{OO_ADMIN_CTL_GEARS} restartgear #{uuid}}, timeout: OP_TIMEOUT)

          Syslog.info("watchman restarted user #{uuid}: application #{gear_name(uuid)} (retries: #{retries})")
          unless 0 == rc
            Syslog.warning("watchman results for #{uuid} restart: exitcode: #{rc}\nstdout: #{out}\nstderr: #{err}")
          end
        end

        begin
          restart_gear.call if @gears.running?(uuid)
        rescue Errno::ENOENT
          # .state file missing is there a stop_lock?
          restart_gear.call unless @gears.stop_lock?(uuid)
        end
      end

      # Conditionally stop gear
      # Don't kill off ssh and bash sessions
      #
      # @param uuid [String] gear identifier
      # @param retries [Fixnum] number of retries for stop
      def stop(uuid, retries)
        out, err, rc = Utils.oo_spawn(%Q{#{OO_ADMIN_CTL_GEARS} condstopgear #{uuid}}, timeout: OP_TIMEOUT)

        Syslog.info("watchman stopped user #{uuid}: application #{gear_name(uuid)} (retries: #{retries})")
        unless 0 == rc
          Syslog.warning("watchman results for #{uuid} stop: exitcode: #{rc}\nstdout: #{out}\nstderr: #{err}")
        end
      end

      # Idle gear
      #
      # @param uuid [String] gear identifier
      # @param retries [Fixnum] number of retries for stop
      def idle(uuid, retries)
        out, err, rc = Utils.oo_spawn(%Q{#{OO_ADMIN_CTL_GEARS} idlegear #{uuid}}, timeout: OP_TIMEOUT)

        Syslog.info("watchman idled user #{uuid}: application #{gear_name(uuid)} (retries: #{retries})")
        unless 0 == rc
          Syslog.warning("watchman results for #{uuid} idle: exitcode: #{rc}\nstdout: #{out}\nstderr: #{err}")
        end
      end
    end


    # Provide a read-through cache of node.conf file
    class CachedConfig
      include Enumerable
      extend Forwardable

      attr_reader :last_updated

      def_delegators :@config, :keys, :has_key?

      # @param path [String] path to node.conf
      def initialize(path)
        @path   = path
        @config = {}
        @mutex  = Mutex.new
        import_config()
      end

      # get value for key from node.conf element
      # @param key [String] the key to retrieve
      # @param default [String] the default value if key not found
      # @return [String] the value for key
      def get(key, default = nil)
        import_config()
        @config.has_key?(key) ? @config[key] : default
      end

      # update cache if node.conf has changed
      # @param force [true, false] Force cache update
      def import_config(force = false)
        updated = File.stat(@path).mtime
        @mutex.synchronize do
          return if @last_updated == updated && !force
          @last_updated = updated

          c = ParseConfig.new(@path)
          c.get_params.each do |key|
            val = c[key]
            val.gsub!(/\\:/, ':')
            val.gsub!(/[ \t]*#[^\n]*/, '')
            val          = val[1..-2] if val.start_with? "\""
            val          = val[1..-2] if val.start_with? "\'"
            @config[key] = val
          end
        end
      end
    end

    # Provide a read-through cache of running login id's for gears from passwd file
    #
    # @see Array
    # @see Enumerable
    # @see Forwardable
    class CachedGears
      include Enumerable
      extend Forwardable

      # @!attribute [r] last_updated
      #   @return [DateTime] the DateTime when cache last updated
      attr_reader :last_updated

      # @!attribute [r] all gears on system
      #   @return [Array<String>] total number of gears on node
      attr_reader :ids

      # @!attribute [r] total_count
      #   @return [Fixnum] total number of gears on node
      attr_reader :total_count

      def_delegators :@gears, :length, :empty?, :include?, :[], :count

      # @param config [Config] node configuration
      # @param passwd [String] path to passwd file
      def initialize(config, passwd = '/etc/passwd')
        @config      = config
        @gears       = []
        @ids         = []
        @total_count = 0
        @passwd      = passwd
        @mutex       = Mutex.new
        import_gears()
      end

      # @param block [Block] block to execute for each gear
      def each(&block)
        return enum_for(__method__) if block.nil?

        import_gears()
        @gears.each(&block)
      end

      # Does gear have a stop lock file?
      # @param uuid [String] gear login id to check
      # @return [true, false] true if stop lock exists
      def stop_lock?(uuid)
        File.exists?(stop_lock(uuid))
      end

      # @param uuid [String] gear login id
      # @return [String] Path to gear's stop lock file
      def stop_lock(uuid)
        PathUtils.join(@config.get('GEAR_BASE_DIR', GEAR_BASE_DIR), uuid, 'app-root', 'runtime', '.stop_lock')
      end

      # Is gear running?
      # @param uuid [String] gear login id to check
      # @return [true, false] return true if gear status is running
      def running?(uuid)
        state = state(uuid)
        not ('idle' == state || 'stopped' == state)
      end

      # get gear state
      # @param uuid [String] gear login id to check
      # @return [String] gear state
      def state(uuid)
        file = PathUtils.join(@config.get('GEAR_BASE_DIR', GEAR_BASE_DIR), uuid, 'app-root', 'runtime', '.state')
        IO.read(file).chomp.downcase rescue 'unknown'
      end

      # update cache if passwd has changed
      # @param force [true, false] Force cache update
      def import_gears(force = false)
        updated = File.stat(@passwd).mtime
        @mutex.synchronize do
          next if @last_updated == updated && !force
          @last_updated = updated

          gecos         = @config.get('GEAR_GECOS', GEAR_GECOS)
          results, _, _ = Utils.oo_spawn(%Q{/bin/grep '#{gecos}' #{@passwd} | cut -d: -f1},
                                         quiet:               true,
                                         expected_exitstatus: 0)

          if results.nil? or results.empty?
            @ids.clear
            @total_count = 0
          else
            @ids         = results.split("\n")
            @total_count = @ids.length
          end
        end
        @gears = @ids.find_all { |gear| running?(gear) }
      end
    end
  end
end

# Guard starting daemons to allow automated testing
if ENV['DAEMONS_ARGV']
  PathUtils.flock('/var/lock/oo-watchman.lock') do

    # Override Daemons gem explicate 0000 mask
    ::OpenShift::Runtime::Utils.oo_spawn %q{chmod -f 0600 /var/log/watchman.*}
    ::OpenShift::Runtime::Utils.oo_spawn %q{chmod -f 0644 /var/run/watchman.pid}

    conf_dir       = ENV['OPENSHIFT_CONF_DIR'] || OPENSHIFT_CONF_DIR
    node_conf_file = PathUtils.join(conf_dir, 'node.conf')

    config = OpenShift::Runtime::CachedConfig.new(node_conf_file)
    gears  = OpenShift::Runtime::CachedGears.new(config, '/etc/passwd')

    watcher              = OpenShift::Runtime::Watchman.new(config, gears)
    watcher.gear_retries = ENV['GEAR_RETRIES'].to_i unless ENV['GEAR_RETRIES'].nil?
    watcher.retry_delay  = ENV['RETRY_DELAY'].to_i unless ENV['RETRY_DELAY'].nil?
    watcher.retry_period = ENV['RETRY_PERIOD'].to_i unless ENV['RETRY_PERIOD'].nil?
    watcher.apply
  end
end
