summaryrefslogtreecommitdiffstats
path: root/pym/_emerge/__init__.py
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2009-06-22 18:21:56 +0000
committerZac Medico <zmedico@gentoo.org>2009-06-22 18:21:56 +0000
commit69500d28402817fb70c104501b5c8f4f54650f4a (patch)
treeb718c8028eb09ad8e8fb5e1503df58abb502b712 /pym/_emerge/__init__.py
parent13d9a8d5f95575c814570cd4e5713541ed022f37 (diff)
downloadportage-69500d28402817fb70c104501b5c8f4f54650f4a.tar.gz
portage-69500d28402817fb70c104501b5c8f4f54650f4a.tar.bz2
portage-69500d28402817fb70c104501b5c8f4f54650f4a.zip
Bug #275047 - Split _emerge/__init__.py into smaller pieces (part 2).
Thanks to Sebastian Mingramm (few) <s.mingramm@gmx.de> for this patch. svn path=/main/trunk/; revision=13667
Diffstat (limited to 'pym/_emerge/__init__.py')
-rw-r--r--pym/_emerge/__init__.py365
1 files changed, 4 insertions, 361 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 0a2bc6d2c..3a3dac25e 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -83,86 +83,10 @@ from _emerge.UseFlagDisplay import UseFlagDisplay
from _emerge.PollSelectAdapter import PollSelectAdapter
from _emerge.SequentialTaskQueue import SequentialTaskQueue
from _emerge.ProgressHandler import ProgressHandler
-
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-
-class stdout_spinner(object):
- scroll_msgs = [
- "Gentoo Rocks ("+platform.system()+")",
- "Thank you for using Gentoo. :)",
- "Are you actually trying to read this?",
- "How many times have you stared at this?",
- "We are generating the cache right now",
- "You are paying too much attention.",
- "A theory is better than its explanation.",
- "Phasers locked on target, Captain.",
- "Thrashing is just virtual crashing.",
- "To be is to program.",
- "Real Users hate Real Programmers.",
- "When all else fails, read the instructions.",
- "Functionality breeds Contempt.",
- "The future lies ahead.",
- "3.1415926535897932384626433832795028841971694",
- "Sometimes insanity is the only alternative.",
- "Inaccuracy saves a world of explanation.",
- ]
-
- twirl_sequence = "/-\\|/-\\|/-\\|/-\\|\\-/|\\-/|\\-/|\\-/|"
-
- def __init__(self):
- self.spinpos = 0
- self.update = self.update_twirl
- self.scroll_sequence = self.scroll_msgs[
- int(time.time() * 100) % len(self.scroll_msgs)]
- self.last_update = 0
- self.min_display_latency = 0.05
-
- def _return_early(self):
- """
- Flushing ouput to the tty too frequently wastes cpu time. Therefore,
- each update* method should return without doing any output when this
- method returns True.
- """
- cur_time = time.time()
- if cur_time - self.last_update < self.min_display_latency:
- return True
- self.last_update = cur_time
- return False
-
- def update_basic(self):
- self.spinpos = (self.spinpos + 1) % 500
- if self._return_early():
- return
- if (self.spinpos % 100) == 0:
- if self.spinpos == 0:
- sys.stdout.write(". ")
- else:
- sys.stdout.write(".")
- sys.stdout.flush()
-
- def update_scroll(self):
- if self._return_early():
- return
- if(self.spinpos >= len(self.scroll_sequence)):
- sys.stdout.write(darkgreen(" \b\b\b" + self.scroll_sequence[
- len(self.scroll_sequence) - 1 - (self.spinpos % len(self.scroll_sequence))]))
- else:
- sys.stdout.write(green("\b " + self.scroll_sequence[self.spinpos]))
- sys.stdout.flush()
- self.spinpos = (self.spinpos + 1) % (2 * len(self.scroll_sequence))
-
- def update_twirl(self):
- self.spinpos = (self.spinpos + 1) % len(self.twirl_sequence)
- if self._return_early():
- return
- sys.stdout.write("\b\b " + self.twirl_sequence[self.spinpos])
- sys.stdout.flush()
-
- def update_quiet(self):
- return
+from _emerge.stdout_spinner import stdout_spinner
+from _emerge.UninstallFailure import UninstallFailure
+from _emerge.JobStatusDisplay import JobStatusDisplay
+from _emerge.getloadavg import getloadavg
def userquery(prompt, responses=None, colours=None):
"""Displays a prompt and a set of responses, then waits for a response
@@ -6569,29 +6493,6 @@ def create_poll_instance():
return select.poll()
return PollSelectAdapter()
-getloadavg = getattr(os, "getloadavg", None)
-if getloadavg is None:
- def getloadavg():
- """
- Uses /proc/loadavg to emulate os.getloadavg().
- Raises OSError if the load average was unobtainable.
- """
- try:
- loadavg_str = open('/proc/loadavg').readline()
- except IOError:
- # getloadavg() is only supposed to raise OSError, so convert
- raise OSError('unknown')
- loadavg_split = loadavg_str.split()
- if len(loadavg_split) < 3:
- raise OSError('unknown')
- loadavg_floats = []
- for i in xrange(3):
- try:
- loadavg_floats.append(float(loadavg_split[i]))
- except ValueError:
- raise OSError('unknown')
- return tuple(loadavg_floats)
-
class PollScheduler(object):
class _sched_iface_class(SlotObject):
@@ -6879,253 +6780,6 @@ class TaskScheduler(object):
def add(self, task):
self._queue.add(task)
-class JobStatusDisplay(object):
-
- _bound_properties = ("curval", "failed", "running")
- _jobs_column_width = 48
-
- # Don't update the display unless at least this much
- # time has passed, in units of seconds.
- _min_display_latency = 2
-
- _default_term_codes = {
- 'cr' : '\r',
- 'el' : '\x1b[K',
- 'nel' : '\n',
- }
-
- _termcap_name_map = {
- 'carriage_return' : 'cr',
- 'clr_eol' : 'el',
- 'newline' : 'nel',
- }
-
- def __init__(self, out=sys.stdout, quiet=False, xterm_titles=True):
- object.__setattr__(self, "out", out)
- object.__setattr__(self, "quiet", quiet)
- object.__setattr__(self, "xterm_titles", xterm_titles)
- object.__setattr__(self, "maxval", 0)
- object.__setattr__(self, "merges", 0)
- object.__setattr__(self, "_changed", False)
- object.__setattr__(self, "_displayed", False)
- object.__setattr__(self, "_last_display_time", 0)
- object.__setattr__(self, "width", 80)
- self.reset()
-
- isatty = hasattr(out, "isatty") and out.isatty()
- object.__setattr__(self, "_isatty", isatty)
- if not isatty or not self._init_term():
- term_codes = {}
- for k, capname in self._termcap_name_map.iteritems():
- term_codes[k] = self._default_term_codes[capname]
- object.__setattr__(self, "_term_codes", term_codes)
- encoding = sys.getdefaultencoding()
- for k, v in self._term_codes.items():
- if not isinstance(v, basestring):
- self._term_codes[k] = v.decode(encoding, 'replace')
-
- def _init_term(self):
- """
- Initialize term control codes.
- @rtype: bool
- @returns: True if term codes were successfully initialized,
- False otherwise.
- """
-
- term_type = os.environ.get("TERM", "vt100")
- tigetstr = None
-
- try:
- import curses
- try:
- curses.setupterm(term_type, self.out.fileno())
- tigetstr = curses.tigetstr
- except curses.error:
- pass
- except ImportError:
- pass
-
- if tigetstr is None:
- return False
-
- term_codes = {}
- for k, capname in self._termcap_name_map.iteritems():
- code = tigetstr(capname)
- if code is None:
- code = self._default_term_codes[capname]
- term_codes[k] = code
- object.__setattr__(self, "_term_codes", term_codes)
- return True
-
- def _format_msg(self, msg):
- return ">>> %s" % msg
-
- def _erase(self):
- self.out.write(
- self._term_codes['carriage_return'] + \
- self._term_codes['clr_eol'])
- self.out.flush()
- self._displayed = False
-
- def _display(self, line):
- self.out.write(line)
- self.out.flush()
- self._displayed = True
-
- def _update(self, msg):
-
- out = self.out
- if not self._isatty:
- out.write(self._format_msg(msg) + self._term_codes['newline'])
- self.out.flush()
- self._displayed = True
- return
-
- if self._displayed:
- self._erase()
-
- self._display(self._format_msg(msg))
-
- def displayMessage(self, msg):
-
- was_displayed = self._displayed
-
- if self._isatty and self._displayed:
- self._erase()
-
- self.out.write(self._format_msg(msg) + self._term_codes['newline'])
- self.out.flush()
- self._displayed = False
-
- if was_displayed:
- self._changed = True
- self.display()
-
- def reset(self):
- self.maxval = 0
- self.merges = 0
- for name in self._bound_properties:
- object.__setattr__(self, name, 0)
-
- if self._displayed:
- self.out.write(self._term_codes['newline'])
- self.out.flush()
- self._displayed = False
-
- def __setattr__(self, name, value):
- old_value = getattr(self, name)
- if value == old_value:
- return
- object.__setattr__(self, name, value)
- if name in self._bound_properties:
- self._property_change(name, old_value, value)
-
- def _property_change(self, name, old_value, new_value):
- self._changed = True
- self.display()
-
- def _load_avg_str(self):
- try:
- avg = getloadavg()
- except OSError:
- return 'unknown'
-
- max_avg = max(avg)
-
- if max_avg < 10:
- digits = 2
- elif max_avg < 100:
- digits = 1
- else:
- digits = 0
-
- return ", ".join(("%%.%df" % digits ) % x for x in avg)
-
- def display(self):
- """
- Display status on stdout, but only if something has
- changed since the last call.
- """
-
- if self.quiet:
- return
-
- current_time = time.time()
- time_delta = current_time - self._last_display_time
- if self._displayed and \
- not self._changed:
- if not self._isatty:
- return
- if time_delta < self._min_display_latency:
- return
-
- self._last_display_time = current_time
- self._changed = False
- self._display_status()
-
- def _display_status(self):
- # Don't use len(self._completed_tasks) here since that also
- # can include uninstall tasks.
- curval_str = str(self.curval)
- maxval_str = str(self.maxval)
- running_str = str(self.running)
- failed_str = str(self.failed)
- load_avg_str = self._load_avg_str()
-
- color_output = StringIO()
- plain_output = StringIO()
- style_file = portage.output.ConsoleStyleFile(color_output)
- style_file.write_listener = plain_output
- style_writer = portage.output.StyleWriter(file=style_file, maxcol=9999)
- style_writer.style_listener = style_file.new_styles
- f = formatter.AbstractFormatter(style_writer)
-
- number_style = "INFORM"
- f.add_literal_data("Jobs: ")
- f.push_style(number_style)
- f.add_literal_data(curval_str)
- f.pop_style()
- f.add_literal_data(" of ")
- f.push_style(number_style)
- f.add_literal_data(maxval_str)
- f.pop_style()
- f.add_literal_data(" complete")
-
- if self.running:
- f.add_literal_data(", ")
- f.push_style(number_style)
- f.add_literal_data(running_str)
- f.pop_style()
- f.add_literal_data(" running")
-
- if self.failed:
- f.add_literal_data(", ")
- f.push_style(number_style)
- f.add_literal_data(failed_str)
- f.pop_style()
- f.add_literal_data(" failed")
-
- padding = self._jobs_column_width - len(plain_output.getvalue())
- if padding > 0:
- f.add_literal_data(padding * " ")
-
- f.add_literal_data("Load avg: ")
- f.add_literal_data(load_avg_str)
-
- # Truncate to fit width, to avoid making the terminal scroll if the
- # line overflows (happens when the load average is large).
- plain_output = plain_output.getvalue()
- if self._isatty and len(plain_output) > self.width:
- # Use plain_output here since it's easier to truncate
- # properly than the color output which contains console
- # color codes.
- self._update(plain_output[:self.width])
- else:
- self._update(color_output.getvalue())
-
- if self.xterm_titles:
- xtermTitle(" ".join(plain_output.split()))
-
class Scheduler(PollScheduler):
_opts_ignore_blockers = \
@@ -8875,17 +8529,6 @@ class MetadataRegen(PollScheduler):
self._schedule()
-class UninstallFailure(portage.exception.PortageException):
- """
- An instance of this class is raised by unmerge() when
- an uninstallation fails.
- """
- status = 1
- def __init__(self, *pargs):
- portage.exception.PortageException.__init__(self, pargs)
- if pargs:
- self.status = pargs[0]
-
def unmerge(root_config, myopts, unmerge_action,
unmerge_files, ldpath_mtimes, autoclean=0,
clean_world=1, clean_delay=1, ordered=0, raise_on_error=0,