# Copyright 1999-2009 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 # $Id$ import formatter import sys import time try: from cStringIO import StringIO except ImportError: from StringIO import StringIO import portage from portage import os from portage import _encodings from portage.output import xtermTitle from _emerge.getloadavg import getloadavg class JobStatusDisplay(object): _bound_properties = ("curval", "failed", "running") _jobs_column_width = 48 # Don't update the display unless at least this much # time has passed, in units of seconds. _min_display_latency = 2 _default_term_codes = { 'cr' : '\r', 'el' : '\x1b[K', 'nel' : '\n', } _termcap_name_map = { 'carriage_return' : 'cr', 'clr_eol' : 'el', 'newline' : 'nel', } def __init__(self, quiet=False, xterm_titles=True): object.__setattr__(self, "quiet", quiet) object.__setattr__(self, "xterm_titles", xterm_titles) object.__setattr__(self, "maxval", 0) object.__setattr__(self, "merges", 0) object.__setattr__(self, "_changed", False) object.__setattr__(self, "_displayed", False) object.__setattr__(self, "_last_display_time", 0) object.__setattr__(self, "width", 80) self.reset() isatty = hasattr(self.out, "isatty") and self.out.isatty() object.__setattr__(self, "_isatty", isatty) if not isatty or not self._init_term(): term_codes = {} for k, capname in self._termcap_name_map.iteritems(): term_codes[k] = self._default_term_codes[capname] object.__setattr__(self, "_term_codes", term_codes) encoding = sys.getdefaultencoding() for k, v in self._term_codes.items(): if not isinstance(v, basestring): self._term_codes[k] = v.decode(encoding, 'replace') @property def out(self): """Use a lazy reference to sys.stdout, in case the API consumer has temporarily overridden stdout.""" return sys.stdout def _write(self, s): if sys.hexversion < 0x3000000 and isinstance(s, unicode): # avoid potential UnicodeEncodeError s = s.encode(_encodings['stdio'], errors='backslashreplace') self.out.write(s) self.out.flush() def _init_term(self): """ Initialize term control codes. @rtype: bool @returns: True if term codes were successfully initialized, False otherwise. """ term_type = os.environ.get("TERM", "vt100") tigetstr = None try: import curses try: curses.setupterm(term_type, self.out.fileno()) tigetstr = curses.tigetstr except curses.error: pass except ImportError: pass if tigetstr is None: return False term_codes = {} for k, capname in self._termcap_name_map.iteritems(): code = tigetstr(capname) if code is None: code = self._default_term_codes[capname] term_codes[k] = code object.__setattr__(self, "_term_codes", term_codes) return True def _format_msg(self, msg): return ">>> %s" % msg def _erase(self): self._write( self._term_codes['carriage_return'] + \ self._term_codes['clr_eol']) self._displayed = False def _display(self, line): self._write(line) self._displayed = True def _update(self, msg): if not self._isatty: self._write(self._format_msg(msg) + self._term_codes['newline']) self._displayed = True return if self._displayed: self._erase() self._display(self._format_msg(msg)) def displayMessage(self, msg): was_displayed = self._displayed if self._isatty and self._displayed: self._erase() self._write(self._format_msg(msg) + self._term_codes['newline']) self._displayed = False if was_displayed: self._changed = True self.display() def reset(self): self.maxval = 0 self.merges = 0 for name in self._bound_properties: object.__setattr__(self, name, 0) if self._displayed: self._write(self._term_codes['newline']) self._displayed = False def __setattr__(self, name, value): old_value = getattr(self, name) if value == old_value: return object.__setattr__(self, name, value) if name in self._bound_properties: self._property_change(name, old_value, value) def _property_change(self, name, old_value, new_value): self._changed = True self.display() def _load_avg_str(self): try: avg = getloadavg() except OSError: return 'unknown' max_avg = max(avg) if max_avg < 10: digits = 2 elif max_avg < 100: digits = 1 else: digits = 0 return ", ".join(("%%.%df" % digits ) % x for x in avg) def display(self): """ Display status on stdout, but only if something has changed since the last call. """ if self.quiet: return current_time = time.time() time_delta = current_time - self._last_display_time if self._displayed and \ not self._changed: if not self._isatty: return if time_delta < self._min_display_latency: return self._last_display_time = current_time self._changed = False self._display_status() def _display_status(self): # Don't use len(self._completed_tasks) here since that also # can include uninstall tasks. curval_str = str(self.curval) maxval_str = str(self.maxval) running_str = str(self.running) failed_str = str(self.failed) load_avg_str = self._load_avg_str() color_output = StringIO() plain_output = StringIO() style_file = portage.output.ConsoleStyleFile(color_output) style_file.write_listener = plain_output style_writer = portage.output.StyleWriter(file=style_file, maxcol=9999) style_writer.style_listener = style_file.new_styles f = formatter.AbstractFormatter(style_writer) number_style = "INFORM" f.add_literal_data("Jobs: ") f.push_style(number_style) f.add_literal_data(curval_str) f.pop_style() f.add_literal_data(" of ") f.push_style(number_style) f.add_literal_data(maxval_str) f.pop_style() f.add_literal_data(" complete") if self.running: f.add_literal_data(", ") f.push_style(number_style) f.add_literal_data(running_str) f.pop_style() f.add_literal_data(" running") if self.failed: f.add_literal_data(", ") f.push_style(number_style) f.add_literal_data(failed_str) f.pop_style() f.add_literal_data(" failed") padding = self._jobs_column_width - len(plain_output.getvalue()) if padding > 0: f.add_literal_data(padding * " ") f.add_literal_data("Load avg: ") f.add_literal_data(load_avg_str) # Truncate to fit width, to avoid making the terminal scroll if the # line overflows (happens when the load average is large). plain_output = plain_output.getvalue() if self._isatty and len(plain_output) > self.width: # Use plain_output here since it's easier to truncate # properly than the color output which contains console # color codes. self._update(plain_output[:self.width]) else: self._update(color_output.getvalue()) if self.xterm_titles: xtermTitle(" ".join(plain_output.split()))