#!/usr/bin/env oo-ruby
require 'rubygems'
require 'pp'
require 'thread'
require 'getoptlong'
require 'stringio'
require 'set'
require 'json'
require 'thread'
require 'thor'

# This tool is a compilation of oo-admin-upgrade from origin-server
# encompassing the various revisions corresponding to OpenShift Online
# releases spanning the duration between RHOSE 1.2 and RHOSE 2.0.  In
# addition to code from oo-admin-upgrade, this tool incorporates
# additional hotfixes from the internal Libra release tickets.

module OpenShift
  module Upgrader
    ##
    # A simple base class for objects which are likely to be marshalled to and
    # from JSON files. Allows for consistent construction with both symbol-based
    # option hashes and JSON-derived hashes with string keys.
    #
    class UpgradePersistable
      def initialize(args)
        args.each{|k, v| send("#{k.to_s}=", v)}
      end

      def jsonish_self
        hash = {}
        instance_variables.each {|v| hash[v.to_s[1..-1]] = instance_variable_get(v)}
        hash
      end

      def to_json
        JSON.dump(jsonish_self)
      end

      def self.from_json(json)
        self.new(JSON.load(json))
      end

      def self.from_json_file(file)
        self.new(JSON.parse(IO.read(file)))
      end

      def self.entries_from_json_array_file(file)
        entries = []
        File.readlines(file).each do |json|
          entries << self.from_json(json)
        end
        entries
      end
    end

    ##
    # High level metadata about the portion of the cluster
    # being upgraded by this upgrader (e.g. the subset of 
    # nodes to be upgraded).
    #
    class ClusterMetadata < UpgradePersistable
      attr_accessor :logins_count,
                    :upgrader_position_nodes,
                    :times

      def initialize(args = {})
        super(args)

        @logins_count ||= 0
        @times ||= {}
        @upgrader_position_nodes ||= []
      end
    end

    ##
    # An individual node queue entry representing a specific
    # node on which an upgrade should be performed.
    #
    class NodeQueueEntry < UpgradePersistable
      attr_accessor :server_identity,
                    :active_gear_length,
                    :inactive_gear_length,
                    :gears_length,
                    :version

      def initialize(args)
        super(args)

        @gears_length ||= 0
        @active_gear_length ||= 0
        @inactive_gear_length ||= 0
      end
    end

    ##
    # An individual gear queue entry representing a specific
    # gear on a node to be upgraded, and all the metadata
    # necessary to feed back into a separate Upgrader instance
    # to execute the work.
    #
    class GearQueueEntry < UpgradePersistable
      attr_accessor :server_identity,
                    :version,
                    :gear_uuid,
                    :gear_name,
                    :app_name,
                    :login,
                    :active

      def initialize(args)
        super(args)
      end
    end

    ##
    # A summary object containing the overall cluster upgrade
    # result metadata across all nodes for a given upgrade
    # attempt.
    #
    class ClusterUpgradeResult < UpgradePersistable
      attr_accessor :times,
                    :gear_count,
                    :gear_counts,
                    :cluster_metadata,
                    :starting_node_queue,
                    :incomplete_node_queue,
                    :leftover_gears_per_node,
                    :failed_gears_per_node

      def initialize(args = {})
        super(args)

        @times ||= {}
        @gear_count ||= 0
        @gear_counts ||= []
        @starting_node_queue ||= []
        @incomplete_node_queue ||= []
        @leftover_gears_per_node ||= {}
        @failed_gears_per_node ||= {}
      end
    end

    ##
    # A summary object containing the upgrade result metadata
    # from a single gear upgrade attempt.
    #
    class GearUpgradeResult < UpgradePersistable
      attr_accessor :login,
                    :app_name,
                    :gear_uuid,
                    :hostname,
                    :version,
                    :errors,
                    :warnings,
                    :times,
                    :remote_upgrade_result,
                    :remote_exit_code

      def initialize(args = {})
        super(args)

        @errors ||= []
        @warnings ||= []
        @times ||= {}
      end
    end
  end
end

class Upgrader
  WORK_DIR = '/tmp/oo-upgrade'

  ##
  # The primary upgrade method. Executes cluster-wide upgrades using the following process:
  #
  # 1. Create or reuses an node queue representing the ordered list of nodes in the cluster to upgrade,
  #    as well as high level metadata about the cluster.
  #
  #    Part of the node queue creation includes the creation of gear queues for each node containing
  #    gears to upgrade.
  #
  # 2. Create worker threads to process each node in the node queue. These workers process gear queues
  #    to perform the actual gear upgrades. Results are written to files on disk per node. Errors are
  #    written to a different file, and also to a re-run gear queue.
  #
  # 3. Recreate the node queue with any incompletely upgraded nodes. A node upgrade is considered incomplete
  #    if any gearss in the node's gear queue weren't processed at all, or if any of the processed gears
  #    resulted in errors.
  #
  # 4. Generate reports based on all output and emit them to stdout.
  #
  # The upgrade process is meant to be re-entrant. If an existing node queue exists, existing gear queues
  # for those nodes will continue processing. If only rerun gear queues exist, processing will proceed using
  # the rerun queues (effectively re-running the errors from the previous run). If no gear queues at all exist
  # for the node, the upgrade for that node is considered complete and no processing will occur for the node.
  #
  # The upgrade is considered a success if there are no nodes left in the node queue.
  #
  # Expects an argument hash:
  #
  #  :version                  - The target upgrade version.
  #
  #  :ignore_cartridge_version - Passed through to gear level upgrades; if +true+, upgrades
  #                              will be performed regardless of the source and target versions.
  #                              Default is +false+.
  #
  #  :target_server_identity   - If present, the node queue will be forced to contain only the
  #                              specified node regardless of any computations.
  #
  #  :num_upgraders            - The total number of upgrader processes intended to be run. Must be used in
  #                              conjunction with +upgrade_position+; +num_upgraders+ defines the number of
  #                              slots which each upgrader process must assign itself to. All positions must
  #                              be taken to upgrade.
  #
  #  :upgrade_position         - Postion of this upgrader (1 based) amongst the num of upgraders (+num_upgraders+)
  #
  #  :max_threads              - The maximum numbers of threads to use when processing nodes from the node queue.
  #                              Default is +12+.
  #
  # Returns a +ClusterUpgradeResult+ containing information about this upgrade execution.
  #
  def upgrade(args={})
    defaults = {
      version: nil,
      ignore_cartridge_version: false,
      target_server_identity: nil,
      upgrade_position: 1,
      num_upgraders: 1,
      max_threads: 12,
      gear_whitelist: []
    }
    opts = defaults.merge(args) {|k, default, arg| arg.nil? ? default : arg}

    version = opts[:version]
    ignore_cartridge_version = opts[:ignore_cartridge_version]
    target_server_identity = opts[:target_server_identity]
    upgrade_position = opts[:upgrade_position]
    num_upgraders = opts[:num_upgraders]
    max_threads = opts[:max_threads]
    gear_whitelist = opts[:gear_whitelist]

    puts "Upgrader started with options: #{opts.inspect}"

    FileUtils.mkdir_p WORK_DIR if not Dir.exists?(WORK_DIR)

    upgrade_result = OpenShift::Upgrader::ClusterUpgradeResult.new

    start_time = current_time
    upgrade_result.times["start"] = start_time

    # only rebuild the queues and meta if none exists
    if File.exists?(cluster_metadata_path)
      puts "Reusing existing queues and cluster metadata"
    else
      puts "Building new upgrade queues and cluster metadata"
      create_upgrade_queues(target_server_identity: target_server_identity,
                            upgrade_position: upgrade_position,
                            num_upgraders: num_upgraders,
                            version: version,
                            gear_whitelist: gear_whitelist)
    end

    raise "Couldn't find node queue at #{node_queue_path}" unless File.exists?(node_queue_path)
    raise "Couldn't find cluster metadata at #{cluster_metadata_path}" unless File.exists?(cluster_metadata_path)
    
    puts "Loading cluster metadata from #{cluster_metadata_path}"
    metadata = OpenShift::Upgrader::ClusterMetadata.from_json_file(cluster_metadata_path)

    puts "Loading node queue from #{node_queue_path}"
    node_queue = OpenShift::Upgrader::NodeQueueEntry.entries_from_json_array_file(node_queue_path)

    # Bail out if there's nothing to do
    if node_queue.empty?
      puts "Node queue is empty; there's nothing for the upgrader to do. Exiting."
      return upgrade_result
    end

    upgrade_result.starting_node_queue = OpenShift::Upgrader::NodeQueueEntry.entries_from_json_array_file(node_queue_path)
    upgrade_result.cluster_metadata = metadata

    if !metadata.upgrader_position_nodes.empty?
      puts "#####################################################"
      puts 'Nodes this upgrader is handling:'
      puts metadata.upgrader_position_nodes.pretty_inspect
      puts "#####################################################"
    end

    # build a work queue that ensures active gears are processed across all
    # nodes prior to any inactive gears being upgraded
    work_queue = Queue.new
    node_queue.each {|node| work_queue << { node: node, active_only: true }}
    
    node_threads = []
    completed_node_count = 0

    # consume the work across the max number of threads
    max_threads.times do |thread_num|
      node_threads << Thread.new do
        while completed_node_count != node_queue.length
          begin
            work = work_queue.pop(true)
          rescue ThreadError => e
            # wait for more work
            sleep 1
            next
          end

          node = work[:node]
          active_only = work[:active_only]

          upgrade_result.gear_counts[thread_num] = node.gears_length

          # perform the node upgrade
          begin
            upgrade_node(node.server_identity, active_only, ignore_cartridge_version)
          rescue Exception => e
            puts e.message
            puts e.backtrace.join("\n")
          end

          # queue the remaining (inactive) work for the node if we
          # already did a pass at just the active gears
          if active_only && File.exists?(gear_queue_path(node.server_identity))
            work_queue << { node: node, active_only: false }
          else
            completed_node_count += 1
            upgrade_result.gear_count += node.gears_length
            puts "#{completed_node_count} of #{node_queue.length} nodes completed"
          end
        end
      end
    end

    # wait for all the threads to finish
    node_threads.each {|t| t.join}

    # rewrite the node queue with any leftover work
    node_queue.each do |node|
      upgrade_result.incomplete_node_queue << node if node_has_remaining_work?(node.server_identity)
    end

    puts "Writing updated node queue to #{node_queue_path}"

    timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
    FileUtils.mv(node_queue_path, "#{node_queue_path}-#{timestamp}")
    FileUtils.touch node_queue_path
    upgrade_result.incomplete_node_queue.each do |node|
      append_to_file(node_queue_path, node.to_json)
    end

    upgrade_result.times["total"] = current_time - start_time

    leftover_gears_per_node = {}
    failed_gears_per_node = {}

    upgrade_result.starting_node_queue.each do |node|
      server_identity = node.server_identity
      gear_queue_file = gear_queue_path(server_identity)
      gear_rerun_queue_file = gear_rerun_queue_path(server_identity)

      if File.exists?(gear_queue_file)
        leftover_gears_per_node[server_identity] = `wc -l #{gear_queue_file}`.to_i
      end

      if File.exists?(gear_rerun_queue_file)
        failed_gears_per_node[server_identity] = `wc -l #{gear_rerun_queue_file}`.to_i
      end
    end

    upgrade_result.leftover_gears_per_node = leftover_gears_per_node
    upgrade_result.failed_gears_per_node = failed_gears_per_node

    # print a report
    puts build_text_summary(upgrade_result)

    upgrade_result
  end

  ##
  # Returns +true+ if the given node has any outstanding gear queues.
  #
  def node_has_remaining_work?(server_identity)
    return File.exists?(gear_queue_path(server_identity)) || File.exists?(gear_rerun_queue_path(server_identity))
  end

  ##
  # Finds all the nodes containing gears to be upgraded, and writes:
  #
  #   1. A JSON dump of a +ClusterMetadata+ instance
  #
  #   2. A node queue file where each line contains JSON dump of a +NodeQueueEntry+ representing
  #      the ordered list of nodes to be upgraded
  #
  #   3. A gear queue file per node, where each line in the queue file is a JSON dump of a
  #      +GearQueueEntry+ represnting a gear to be upgraded.
  #
  # Expects an argument hash with data to store along with each node queue entry used to construct
  # the gear level node queues. Each argument is typically just passed through from +upgrade+; see
  # that method for details.
  #
  #  :version
  #  :ignore_cartridge_version
  #  :target_server_itentity
  #  :upgrade_position
  #  :num_upgraders
  #  :max_threads
  #
  def create_upgrade_queues(opts)
    raise "Node queue file already exists at #{node_queue_path}" if File.exists?(node_queue_path)

    target_server_identity = opts[:target_server_identity]
    upgrade_position = opts[:upgrade_position]
    num_upgraders = opts[:num_upgraders]
    version = opts[:version]
    gear_whitelist = opts[:gear_whitelist]

    metadata = OpenShift::Upgrader::ClusterMetadata.new

    puts "Getting all active gears..."
    gather_active_gears_start_time = current_time
    active_gears_map = OpenShift::ApplicationContainerProxy.get_all_active_gears
    metadata.times["gather_active_gears_total_time"] = current_time - gather_active_gears_start_time

    puts "Getting all logins..."
    gather_users_start_time = current_time
    query = {"group_instances.gears.0" => {"$exists" => true}}
    options = {:fields => [ "uuid",
                "domain_id",
                "name",
                "created_at",
                "component_instances.cartridge_name",
                "component_instances.group_instance_id",
                "group_instances._id",
                "group_instances.gears.uuid",
                "group_instances.gears.server_identity",
                "group_instances.gears.name"], 
               :timeout => false}

    ret = []
    user_map = {}
    OpenShift::DataStore.find(:cloud_users, {}, {:fields => ["_id", "uuid", "login"], :timeout => false}) do |hash|
      metadata.logins_count += 1
      user_uuid = hash['uuid']
      user_login = hash['login']
      user_map[hash['_id'].to_s] = [user_uuid, user_login]
    end

    domain_map = {}
    OpenShift::DataStore.find(:domains, {}, {:fields => ["_id" , "owner_id"], :timeout => false}) do |hash|
      domain_map[hash['_id'].to_s] = hash['owner_id'].to_s
    end

    node_to_gears = {}
    OpenShift::DataStore.find(:applications, query, options) do |app|
      user_id = domain_map[app['domain_id'].to_s]
      if user_id.nil?
        relocated_domain = Domain.where(_id: Moped::BSON::ObjectId(app['domain_id'])).first
        next if relocated_domain.nil?
        user_id = relocated_domain.owner._id.to_s
        user_uuid = user_id
        user_login = relocated_domain.owner.login
      else
        if user_map.has_key? user_id
          user_uuid,user_login = user_map[user_id]
        else
          relocated_user = CloudUser.where(_id: Moped::BSON::ObjectId(user_id)).first
          next if relocated_user.nil?
          user_uuid = relocated_user._id.to_s
          user_login = relocated_user.login
        end
      end

      app['group_instances'].each do |gi|
        gi['gears'].each do |gear|
          server_identity = gear['server_identity']
          if server_identity && (!target_server_identity || (server_identity == target_server_identity))
            node_to_gears[server_identity] = [] unless node_to_gears[server_identity] 
            node_to_gears[server_identity] << {:server_identity => server_identity, :uuid => gear['uuid'], :name => gear['name'], :app_name => app['name'], :login => user_login}
          end
        end
      end
    end

    metadata.times["gather_users_total_time"] = current_time - gather_users_start_time

    position = upgrade_position - 1
    if num_upgraders > 1
      server_identities = node_to_gears.keys.sort
      server_identities.each_with_index do |server_identity, index|
        if index == position
          metadata.upgrader_position_nodes << server_identity
          position += num_upgraders
        else
          node_to_gears.delete(server_identity)
        end
      end
    end

    # populate the node queue and for persistence
    node_queue = []
    node_to_gears.each do |server_identity, gears|
      node_to_gears[server_identity] = nil
      break if gears.empty?

      # build the node for the queue with just metadata about the
      # node and gears
      node = OpenShift::Upgrader::NodeQueueEntry.new({
        server_identity: server_identity,
        version: version,
        active_gear_length: 0,
        inactive_gear_length: 0
      })

      # sort gears by active/inactive
      active_gears = []
      inactive_gears = []

      gears.each do |gear|
        if gear_whitelist && gear_whitelist.length > 0
          puts "Gear #{gear[:uuid]} is not in the whitelist and will be skipped"
          next unless gear_whitelist.include?(gear[:uuid])
        end

        if active_gears_map.include?(server_identity) && active_gears_map[server_identity].include?(gear[:uuid])
          gear[:active] = true
          active_gears << gear
        else
          gear[:active] = false
          inactive_gears << gear
        end
      end

      node.active_gear_length = active_gears.length
      node.inactive_gear_length = inactive_gears.length
      node.gears_length = gears.length

      node_queue << node

      # ensure we can process active gears first
      write_gear_queue(node, active_gears) unless active_gears.empty?
      write_gear_queue(node, inactive_gears) unless inactive_gears.empty?
    end
    node_to_gears.clear

    # process the largest nodes first
    node_queue = node_queue.sort_by { |node| node.active_gear_length }

    puts "Writing node queue to #{node_queue_path}"
    # Touch to make sure the file gets created even if node_queue is empty.
    FileUtils.touch node_queue_path
    node_queue.each do |node|
      append_to_file(node_queue_path, node.to_json)
    end

    puts "Writing cluster metadata to #{cluster_metadata_path}"
    append_to_file(cluster_metadata_path, metadata.to_json)
  end


  ##
  # Writes a gear queue to a file in a consolidated format containing
  # one line per gear queue entry, where each line is a JSON dump of
  # a +GearQueueEntry+ instance.
  #
  def write_gear_queue(node, gears)
    gear_queue_file = gear_queue_path(node.server_identity)
    puts "Writing #{gears.length} entries to gear queue for node #{node.server_identity} at #{gear_queue_file}"
    
    gears.each do |gear|
      queue_entry = OpenShift::Upgrader::GearQueueEntry.new({
        server_identity: node.server_identity,
        version: node.version,
        gear_uuid: gear[:uuid],
        gear_name: gear[:name],
        app_name: gear[:app_name],
        login: gear[:login],
        active: gear[:active]
      })
      append_to_file(gear_queue_file, queue_entry.to_json)
    end
  end

  ##
  # Performs the a node upgrade of the specified +server_identity+ by setting up the output
  # files and forking a call back to +upgrade_node_from_gear_queue_file+.
  #
  # The upgrade logic will continue or retry failures depending on the presence of the appropriate
  # gear queue files, and will do nothing if there are no queues to process.
  #
  def upgrade_node(server_identity, active_only, ignore_cartridge_version = false)
    raise "No server identity specified" unless server_identity

    gear_queue_file = gear_queue_path(server_identity)
    rerun_queue_file = gear_rerun_queue_path(server_identity)
    error_file = error_file_path(server_identity)

    # if there's no queue of any sort, there's nothing to do on this node
    unless File.exists?(gear_queue_file) || File.exists?(rerun_queue_file)
      puts "Nothing to migrate for node #{server_identity}; no gear queue or rerun queue files exist"
      return
    end

    if File.exists?(gear_queue_file)
      puts "Upgrading node #{server_identity} from gear queue file at #{gear_queue_file}"
    elsif File.exists?(rerun_queue_file)
      puts "Re-running failures from previous upgrade of node #{server_identity} from rerun gear queue at #{rerun_queue_file}"
      
      timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
      archive_error_file = "#{error_file}_#{timestamp}"
      puts "Archiving previous error file from #{error_file} to #{archive_error_file}"
      FileUtils.mv error_file, archive_error_file

      # convert the rerun queue into the new normal queue
      FileUtils.mv rerun_queue_file, gear_queue_file
    end

    # fork the call to +upgrade_node_from_gear_queue_file+ (TODO: revisit this later; the
    # fork is apparently used to work around long-forgotten MCollective threading issues)
    upgrade_node_cmd = "#{__FILE__} upgrade-from-file --upgrade-file '#{gear_queue_file}' --active-only #{active_only}"

    upgrade_node_cmd += " --ignore-cartridge-version" if ignore_cartridge_version

    execute_script(upgrade_node_cmd)
  end

  ##
  # Processes JSON-serialized +GearQueueEntry+, one per line in the given gear queue +file+.
  #
  # Successful upgrade results are written as JSON dumps of +GearUpgradeResult+ instances
  # to +results_file+.
  #
  # Results containing errors are written as JSON dumps of +GearUpgradeResult+ instances to
  # +error_file+ only after two retries have been attempted. In addition, when a result
  # contains errors, the source +GearQueueEntry+ is written as JSON to +gear_rerun_queue_path+
  # to facilitate reruns of past errors.
  #
  # Each time an result is written to disk, the source line from +file+ is deleted. When
  # all lines are processed, the input file is deleted.
  #
  # This method returns nothing; callers must inspect the result file contents for
  # upgrade details.
  #
  def upgrade_node_from_gear_queue_file(file, active_only = false, ignore_cartridge_version = false)
    File.open(file, 'r').each_line do |queue_entry_json|
      next if queue_entry_json.nil? || queue_entry_json.empty?

      gear = OpenShift::Upgrader::GearQueueEntry.from_json(queue_entry_json)

      next if active_only && !gear.active

      log_file = log_file_path(gear.server_identity)
      results_file = results_file_path(gear.server_identity)
      error_file = error_file_path(gear.server_identity)
      rerun_file = gear_rerun_queue_path(gear.server_identity)

      append_to_file(log_file, "Migrating app '#{gear.app_name}' gear '#{gear.gear_name}' with uuid '#{gear.gear_uuid}' on node '#{gear.server_identity}' for user: #{gear.login}")

      # start the upgrades in a retry loop
      num_tries = 2
      (1..num_tries).each do |i|
        # perform the upgrade
        gear_result = upgrade_gear(gear.login, gear.app_name, gear.gear_uuid, gear.version, ignore_cartridge_version)

        # write the results if all is well
        if gear_result.errors.empty?
          append_to_file(results_file, gear_result.to_json)
          break
        end

        # dump the error to disk if the retry limit is hit and we're still failing
        if i == num_tries
          gear_result.errors << "Failed upgrade after #{num_tries} tries"
          append_to_file(error_file, gear_result.to_json)
          append_to_file(rerun_file, gear.to_json)
          break
        end

        # verify the user still exists
        user = nil
        begin
          user = CloudUser.with(consistency: :eventual).find_by(login: gear.login)
        rescue Mongoid::Errors::DocumentNotFound
        end

        # if not, throw a warning and move on
        unless user && Application.find_by_user(user, gear.app_name)
          gear_result.warnings << "App '#{gear.app_name}' no longer found in datastore with uuid '#{gear.gear_uuid}'.  Ignoring..." 
          append_to_file(results_file, gear_result.to_json)
          break
        end

        # if so, we're ready for a retry
        sleep 4
      end

      # The gear has been processed; delete the entry from the file. Inefficient, but
      # it only happens between upgrades which are measured in minutes, and the last
      # non-skipped gear should be near the head of the file anyway. Consider using
      # a database rather than a flat file at a later time.
      gear_line = 1
      File.open(file, 'r').each_line do |line|
        gear_to_delete = OpenShift::Upgrader::GearQueueEntry.from_json(line)
        if gear_to_delete.gear_uuid == gear.gear_uuid
          `sed -i '#{gear_line},#{gear_line}d' #{file}`
          break
        end
        gear_line += 1
      end
    end

    # double-check to ensure all lines were processed, and remove
    # the input file if we're all done.
    FileUtils.rm_f file if `wc -l #{file}`.to_i == 0
  end


  ##
  # Performs a gear upgrade via an RPC call to MCollective on a remote node.
  #
  # Returns a +GearUpgradeResult+, which also including the (decoded) remote
  # upgrade JSON containing the gear level upgrade details.
  #
  # NOTE: All exceptions are trapped and added to the +GearUpgradeResult.errors+.
  # This method should always return the hash and should never bubble exceptions.
  #
  def upgrade_gear(login, app_name, gear_uuid, version, ignore_cartridge_version=false)
    gear_result = OpenShift::Upgrader::GearUpgradeResult.new({
      login: login,
      app_name: app_name,
      gear_uuid: gear_uuid,
      version: version
    })

    total_upgrade_gear_start_time = current_time

    begin
      user = nil
      begin
        user = CloudUser.with(consistency: :eventual).find_by(login: login)
      rescue Mongoid::Errors::DocumentNotFound
      end

      raise "User not found: #{login}" unless user

      app, gear = Application.find_by_gear_uuid(gear_uuid)

      gear_result.warnings << "App '#{app_name}' not found" unless app
      gear_result.warnings << "Gear not found with uuid #{gear_uuid} for app '#{app_name}' and user '#{login}'" unless gear

      if app && gear
        server_identity = gear.server_identity
        gear_result.hostname = server_identity

        remote_result_json = nil
        remote_output = nil
        response_processed = false

        upgrade_on_node_start_time = current_time

        upgrade_args = {
          :uuid => gear_uuid,
          :namespace => app.domain.namespace,
          :app_uuid => app.uuid,
          :secret_token => app.secret_token,
          :version => version,
          :ignore_cartridge_version => ignore_cartridge_version.to_s
        }

        unless app.scalable
          bk = OpenShift::Auth::BrokerKey.new
          auth_iv, auth_token = bk.generate_broker_key(app)
          upgrade_args[:auth_token] = auth_token
          upgrade_args[:auth_iv] = auth_iv
        end

        Timeout::timeout(420) do
          OpenShift::MCollectiveApplicationContainerProxy.rpc_exec('openshift', server_identity) do |client|
            client.upgrade(upgrade_args) do |response|
              remote_result_json = response[:body][:data][:upgrade_result_json]
              remote_output = response[:body][:data][:output]
              response_processed = true
            end
          end
        end

        if response_processed
          if remote_result_json.nil?
            gear_result.errors << "No upgrade result JSON was returned in the upgrade response. The outcome of the upgrade request is unknown. "\
                                  "The following output was returned: #{remote_output}"
          else
            gear_result.remote_upgrade_result = JSON.load(remote_result_json)

            unless gear_result.remote_upgrade_result['upgrade_complete']
              gear_result.errors << "Gear upgrade result is marked incomplete"
            end
          end
        else
          gear_result.errors << "The MCollective upgrade execution didn't explicitly fail, but the response handler was not invoked. "\
                                "The outcome of the upgrade request is unknown."
        end

        gear_result.times["time_upgrade_on_node_measured_from_broker"] = current_time - upgrade_on_node_start_time
      end
    rescue Timeout::Error => e
      gear_result.errors << "Timed out waiting on a response from the MCollective upgrade call"
    rescue Exception => e
      gear_result.errors << "Upgrade failed with an unhandled exception: #{e.message}\n#{e.backtrace.join("\n")}"
    end

    total_upgrade_gear_time = current_time - total_upgrade_gear_start_time
    gear_result.times["time_total_upgrade_gear_measured_from_broker"] = total_upgrade_gear_start_time

    gear_result
  end

  ##
  # Builds a simply textual summary of a +ClusterUpgradeResult+.
  #
  # Returns the report as a string.
  #
  def build_text_summary(upgrade_result)
    buf = []

    failed_gears_total = upgrade_result.failed_gears_per_node.values.inject(0, :+)
    leftover_gears_total = upgrade_result.leftover_gears_per_node.values.inject(0, :+)

    if upgrade_result.leftover_gears_per_node.length > 0 || upgrade_result.failed_gears_per_node.length > 0
      buf << "!!!!!!!!!!WARNING!!!!!!!!!!!!!WARNING!!!!!!!!!!!!WARNING!!!!!!!!!!"
      buf << "The upgrade was incomplete due to unprocessed or failed gears"
      buf << "remaining in node gear queues:"
      unless upgrade_result.leftover_gears_per_node.empty?
        buf << ""
        buf << "#{leftover_gears_total} unprocessed gears:"
        upgrade_result.leftover_gears_per_node.each do |server_identity, count|
          buf << "  #{server_identity}: #{count}"
        end
      end

      unless upgrade_result.failed_gears_per_node.empty?
        buf << ""
        buf << "#{failed_gears_total} failed gears:"
        upgrade_result.failed_gears_per_node.each do |server_identity, count|
          buf << "  #{server_identity}: #{count}"
        end
      end
      buf << ""
      buf << "You can run the upgrade again with the same arguments to continue."
      buf << "!!!!!!!!!!WARNING!!!!!!!!!!!!!WARNING!!!!!!!!!!!!WARNING!!!!!!!!!!"
    end

    upgrader_position_nodes = upgrade_result.cluster_metadata.upgrader_position_nodes

    buf << "#####################################################"
    buf << "Summary:"
    buf << "# of users: #{upgrade_result.cluster_metadata.logins_count}"
    buf << "# of gears: #{upgrade_result.gear_count}"
    buf << "# of failures: #{failed_gears_total}"
    buf << "# of leftovers: #{leftover_gears_total}"
    buf << "Gear counts per thread: #{upgrade_result.gear_counts.pretty_inspect}"
    buf << "Nodes upgraded: #{upgrader_position_nodes.pretty_inspect}" if !upgrader_position_nodes.empty?
    buf << "Timings:"
    upgrade_result.times.each do |topic, time_in_millis|
      buf << "    #{topic}=#{time_in_millis.to_f/1000}s"
    end
    buf << "Additional timings:"
    upgrade_result.cluster_metadata.times.each do |topic, time_in_millis|
      buf << "    #{topic}=#{time_in_millis.to_f/1000}s"
    end
    buf << "#####################################################"

    buf.join("\n")
  end

  def log_file_path(server_identity)
    "#{WORK_DIR}/upgrade_log_#{server_identity}"
  end

  def gear_queue_path(server_identity)
    "#{WORK_DIR}/gear_queue_#{server_identity}"
  end

  def gear_rerun_queue_path(server_identity)
    "#{WORK_DIR}/gear_rerun_queue_#{server_identity}"
  end

  def results_file_path(server_identity)
    "#{WORK_DIR}/upgrade_results_#{server_identity}"
  end

  def error_file_path(server_identity)
    "#{WORK_DIR}/upgrade_errors_#{server_identity}"
  end

  def node_queue_path
    "#{WORK_DIR}/node_queue"
  end

  def cluster_metadata_path
    "#{WORK_DIR}/cluster_metadata"
  end

  ##
  # Appends +value+ to +filename+.
  #
  def append_to_file(filename, value)
    FileUtils.touch filename unless File.exists?(filename)

    file = File.open(filename, 'a')
    begin
      file.puts value
    ensure
      file.close
    end
  end

  ##
  # Executes +cmd+ up to +num_tries+ times waiting for a zero exitcode up with
  # a timeout of +timeout+.
  #
  # Returns [+output+, +exitcode+] of the process.
  #
  def execute_script(cmd, num_tries=1, timeout=28800)
    exitcode = nil
    output = ''
    (1..num_tries).each do |i|
      pid = nil
      begin
        Timeout::timeout(timeout) do
          read, write = IO.pipe
          pid = fork {
            # child
            $stdout.reopen write
            read.close
            exec(cmd)
          }
          # parent
          write.close
          read.each do |line|
            output << line
          end
          Process.waitpid(pid)
          exitcode = $?.exitstatus
        end
        break
      rescue Timeout::Error
        begin
          Process.kill("TERM", pid) if pid
        rescue Exception => e
          puts "execute_script: WARNING - Failed to kill cmd: '#{cmd}' with message: #{e.message}"
        end
        puts "Command '#{cmd}' timed out"
        raise if i == num_tries
      end
    end
    return output, exitcode
  end

  def current_time
    (Time.now.to_f * 1000).to_i
  end
end

class UpgraderCli < Thor
  no_tasks do
    def with_upgrader
      STDOUT.sync, STDERR.sync = true

      # Disable analytics for admin scripts
      require '/var/www/openshift/broker/config/environment'
      Rails.configuration.analytics[:enabled] = false
      Rails.configuration.msg_broker[:rpc_options][:disctimeout] = 20

      begin
        upgrader = Upgrader.new
        yield upgrader
      rescue => e
        puts e.message
        puts e.backtrace.join("\n")
        raise
      end
    end
  end

  desc "archive", "Archives existing upgrade data in order to begin a completely new upgrade attempt"
  def archive
    files = Dir.glob(File.join(Upgrader::WORK_DIR, '*')).select{|fn| File.file?(fn)}
    if files.empty?
      puts "No upgrade files found to archive at #{Upgrader::WORK_DIR}"
    else
      timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
      archive_dir = File.join(Upgrader::WORK_DIR, "archive_#{timestamp}")
      FileUtils.mkdir(archive_dir)
      puts "Archiving upgrade output to #{archive_dir}"
      FileUtils.mv files, archive_dir
    end
  end

  desc "upgrade-gear", "Upgrades only the specified gear"
  method_option :login, :type => :string, :required => true, :desc => "User login"
  method_option :app_name, :type => :string, :required => true, :desc => "App name of the gear to upgrade"
  method_option :upgrade_gear, :type => :string, :required => true, :desc => "Gear uuid of the single gear to upgrade"
  method_option :version, :type => :string, :required => true, :desc => "Target version number"
  method_option :ignore_cartridge_version, :type => :boolean, :default => false, :desc => "Force cartridge upgrade even if cartridge versions match"
  def upgrade_gear
    with_upgrader do |upgrader|
      gear_result = upgrader.upgrade_gear(options.login, options.app_name, options.upgrade_gear, options.version, options.ignore_cartridge_version?)
      puts gear_result.to_json
    end
  end

  desc "upgrade-from-file", "Upgrades gears from a gear queue file"
  method_option :upgrade_file, :type => :string, :required => true, :desc => "The gear queue file to upgrade from"
  method_option :active_only, :type => :boolean, :default => false, :desc => "Skip inactive gears when processing the file"
  method_option :ignore_cartridge_version, :type => :boolean, :default => false, :desc => "Force cartridge upgrade even if cartridge versions match"
  def upgrade_from_file
    with_upgrader do |upgrader|
      upgrader.upgrade_node_from_gear_queue_file(options.upgrade_file, options.active_only?, options.ignore_cartridge_version?)
    end
  end

  desc "upgrade-node", "Upgrades one or all nodes to the specified version"
  method_option :version, :type => :string, :required => true, :desc => "Target version number"
  method_option :ignore_cartridge_version, :type => :boolean, :default => false, :desc => "Force cartridge upgrade even if cartridge versions match"
  method_option :upgrade_node, :type => :string, :required => false, :desc => "Server identity of the node to upgrade"
  method_option :upgrade_position, :type => :numeric, :required => false, :desc => "Postion of this upgrader (1 based) amongst the num of upgraders"
  method_option :num_upgraders, :type => :numeric, :required => false, :desc => "The total number of upgraders to be run.  Each upgrade-position will be a "\
                                                                                "upgrade-position of num-upgraders.  All positions must to taken to upgrade "\
                                                                                "all gears."
  method_option :gear_whitelist, :type => :array, :required => false, :desc => "Upgrade only the specified gear UUIDs"
  method_option :max_threads, :type => :numeric, :required => false, :desc =>  "Indicates the number of processing queues"
  def upgrade_node
    if (options.num_upgraders && !options.upgrade_position) || (options.upgrade_position && !options.num_upgraders)
      puts "--num-upgraders and --upgrade-position must be specified together"
      exit 1
    end

    with_upgrader do |upgrader|
      args = {
        version: options.version,
        ignore_cartridge_version: options.ignore_cartridge_version?,
        target_server_identity: options.upgrade_node,
        upgrade_position: options.upgrade_position,
        num_upgraders: options.num_upgraders,
        max_threads: options.max_threads,
        gear_whitelist: options.gear_whitelist
      }

      upgrade_result = upgrader.upgrade(args)

      # Return an error iff any gears were not upgraded.
      upgrade_result.leftover_gears_per_node.length > 0 || upgrade_result.failed_gears_per_node.length > 0
    end
  end
end

UpgraderCli.start if __FILE__ == $0
