From 98e2821b38a775737e42a2479a6bc65107210859 Mon Sep 17 00:00:00 2001 From: Elliot Kroo Date: Thu, 11 Mar 2010 15:21:30 -0800 Subject: reorganizing the first level of folders (trunk/branch folders are not the git way :) --- infrastructure/net.appjet.oui/logging.scala | 530 ++++++++++++++++++++++++++++ 1 file changed, 530 insertions(+) create mode 100644 infrastructure/net.appjet.oui/logging.scala (limited to 'infrastructure/net.appjet.oui/logging.scala') diff --git a/infrastructure/net.appjet.oui/logging.scala b/infrastructure/net.appjet.oui/logging.scala new file mode 100644 index 0000000..9c384d2 --- /dev/null +++ b/infrastructure/net.appjet.oui/logging.scala @@ -0,0 +1,530 @@ +/** + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS-IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.appjet.oui; + +import java.text.SimpleDateFormat; +import java.io.{File, FileWriter, StringWriter, PrintWriter}; +import java.util.Date; +import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentHashMap, CopyOnWriteArraySet}; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.util.Sorting; +import scala.ref.WeakReference; +import scala.collection.mutable.{Map, HashMap}; +import scala.collection.jcl.{SetWrapper, Conversions}; + +import org.json.{JSONObject, JSONArray}; +import org.mozilla.javascript.{Scriptable, Context}; + +import Util.iteratorToRichIterator; +import scala.collection.jcl.Conversions._; + +trait LoggablePropertyBag { + def date: Date; + def `type`: String = value("type").asInstanceOf[String]; + def json: String; + def tabDelimited: String; + def keys: Array[String]; + def value(k: String): Any; +} + +class LoggableFromScriptable( + scr: Scriptable, + extra: Option[scala.collection.Map[String, String]]) + extends LoggablePropertyBag { + def this(scr: Scriptable) = this(scr, None); + if (extra.isDefined) { + for ((k, v) <- extra.get if (! scr.has(k, scr))) { + scr.put(k, scr, v); + } + } + + val keys = + scr.getIds() + .map(_.asInstanceOf[String]) + .filter(scr.get(_, scr) != Context.getUndefinedValue()); + Sorting.quickSort(keys); + if (! scr.has("date", scr)) { + scr.put("date", scr, System.currentTimeMillis()); + } + val date = new Date(scr.get("date", scr).asInstanceOf[Number].longValue); + val json = FastJSON.stringify(scr); + val tabDelimited = GenericLoggerUtils.dateString(date) + "\t" + + keys.filter("date" != _).map(value(_)).mkString("\t"); + + def value(k: String) = { + scr.get(k, scr); + } +} + +class LoggableFromMap[T]( + map: scala.collection.Map[String, T], + extra: Option[scala.collection.Map[String, String]]) + extends LoggablePropertyBag { + def this(map: scala.collection.Map[String, T]) = this(map, None); + val keys = map.keys.collect.toArray ++ + extra.map(_.keys.collect.toArray).getOrElse(Array[String]()); + Sorting.quickSort(keys); + + def fillJson(json: JSONObject, + map: scala.collection.Map[String, T]): JSONObject = { + for ((k, v) <- map) { + v match { + case b: Boolean => json.put(k, b); + case d: Double => json.put(k, d); + case i: Int => json.put(k, i); + case l: Long => json.put(k, l); + case m: java.util.Map[_,_] => json.put(k, m); + case m: scala.collection.Map[String,T] => + json.put(k, fillJson(new JSONObject(), m)); + case c: java.util.Collection[_] => json.put(k, c); + case o: Object => json.put(k, o); + case _ => {}; + } + } + json; + } + val json0 = fillJson(new JSONObject(), map); + if (extra.isDefined) { + for ((k, v) <- extra.get if (! json0.has(k))) { + json0.put(k, v); + } + } + if (! json0.has("date")) { + json0.put("date", System.currentTimeMillis()); + } + val date = new Date(json0.getLong("date")); + val json = json0.toString; + val tabDelimited = + GenericLoggerUtils.dateString(date) + "\t" + + keys.filter("date" != _).map(value(_)).mkString("\t"); + + def value(k: String) = { + map.orElse(extra.getOrElse(Map[String, Any]()))(k); + } +} + +class LoggableFromJson(val json: String) extends LoggablePropertyBag { + val obj = new JSONObject(json); + val date = new Date(obj.getLong("date")); + val keys = obj.sortedKeys().map(String.valueOf(_)).collect.toArray; + def value(k: String) = obj.get(k); + val tabDelimited = + GenericLoggerUtils.dateString(date) + "\t"+ + keys.filter("date" != _).map(value(_)).mkString("\t"); +} + +object GenericLoggerUtils { + lazy val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZ"); + def dateString(date: Date) = df.format(date); + var extraPropertiesFunction: Option[() => Map[String, String]] = None; + def setExtraPropertiesFunction(f: () => Map[String, String]) { + extraPropertiesFunction = Some(() => { + try { + f(); + } catch { + case e => withoutExtraProperties { + exceptionlog(e); + Map[String, String](); + } + } + }); + } + def getExtraProperties: Option[Map[String, String]] = { + if (shouldGetExtraProperties) { + withoutExtraProperties(extraPropertiesFunction.map(_())); + } else { + None; + } + } + + val registeredWranglers = + new ConcurrentHashMap[String, SetWrapper[WeakReference[LogWrangler]]]; + def registerWrangler(name: String, wrangler: LogWrangler) { + wranglers(name) += wrangler.ref; + } + def clearWrangler(name: String, wrangler: LogWrangler) { + wranglers(name) -= wrangler.ref; + } + def wranglers(name: String) = { + if (! registeredWranglers.containsKey(name)) { + val set1 = Conversions.convertSet( + new CopyOnWriteArraySet[WeakReference[LogWrangler]]); + val set2 = registeredWranglers.putIfAbsent( + name, set1); + if (set2 == null) { + set1 + } else { + set2 + } + } else { + registeredWranglers.get(name); + } + } + def tellWranglers(name: String, lpb: LoggablePropertyBag) { + for (w <- wranglers(name)) { + w.get.foreach(_.tell(lpb)); + if (! w.isValid) { + wranglers(name) -= w; + } + } + } + + val shouldGetExtraProperties_var = + new NoninheritedDynamicVariable[Boolean](true); + def withoutExtraProperties[E](block: => E): E = { + shouldGetExtraProperties_var.withValue(false)(block); + } + def shouldGetExtraProperties = shouldGetExtraProperties_var.value; +} + +class GenericLogger(path: String, logName: String, rotateDaily: Boolean) { + val queue = new ConcurrentLinkedQueue[LoggablePropertyBag]; + + var loggerThread: Thread = null; + var currentLogDay:Date = null; + var logWriter: FileWriter = null; + var logBase = config.logDir; + def setLogBase(p: String) { logBase = p } + + var echoToStdOut = false; + def setEchoToStdOut(e: Boolean) { + echoToStdOut = e; + } + def stdOutPrefix = logName+": " + + def initLogWriter(logDay: Date) { + currentLogDay = logDay; + + // if rotating, log filename is logBase/[path/]logName/logName-.jslog + // otherwise, log filename is logBase/[path/]logName.jslog + var fileName = + if (rotateDaily) { + val df = new SimpleDateFormat("yyyy-MM-dd"); + logName + "/" + logName + "-" + df.format(logDay) + ".jslog"; + } else { + logName + ".jslog"; + } + if (path != null && path.length > 0) { + fileName = path + "/" + fileName; + } + val f = new File(logBase+"/"+fileName); + if (! f.getParentFile.exists) { + f.getParentFile().mkdirs(); + } + logWriter = new FileWriter(f, true); + } + + def rotateIfNecessary(messageDate: Date) { + if (rotateDaily) { + if (!((messageDate.getYear == currentLogDay.getYear) && + (messageDate.getMonth == currentLogDay.getMonth) && + (messageDate.getDate == currentLogDay.getDate))) { + logWriter.flush(); + logWriter.close(); + initLogWriter(messageDate); + } + } + } + + def flush() { + flush(java.lang.Integer.MAX_VALUE); + } + def close() { + logWriter.close(); + } + + def flush(n: Int) = synchronized { + var count = 0; + while (count < n && ! queue.isEmpty()) { + val lpb = queue.poll(); + rotateIfNecessary(lpb.date); + logWriter.write(lpb.json+"\n"); + if (echoToStdOut) + print(lpb.tabDelimited.split("\n").mkString(stdOutPrefix, "\n"+stdOutPrefix, "\n")); + count += 1; + } + if (count > 0) { + logWriter.flush(); + } + count; + } + + def start() { + initLogWriter(new Date()); + + loggerThread = new Thread("GenericLogger "+logName) { + this.setDaemon(true); + override def run() { + while (true) { + if (queue.isEmpty()) { + Thread.sleep(500); + } else { + flush(1000); + } + } + } + } + main.loggers += this; + loggerThread.start(); + } + + def log(lpb: LoggablePropertyBag) { + if (loggerThread != null) { + queue.offer(lpb); + GenericLoggerUtils.tellWranglers(logName, lpb); + } + } + def logObject(scr: Scriptable) { + log(new LoggableFromScriptable( + scr, GenericLoggerUtils.getExtraProperties)); + } + def log[T](m: scala.collection.Map[String, T]) { + log(new LoggableFromMap( + m, GenericLoggerUtils.getExtraProperties)); + } + def log(s: String) { + log(Map("message" -> s)); + } + def apply(s: String) { + log(s); + } + def apply(scr: Scriptable) { + logObject(scr); + } + def apply[T](m: scala.collection.Map[String, T]) { + log(m); + } +} + +object profiler extends GenericLogger("backend", "profile", false) { + def apply(id: String, op: String, method: String, path: String, countAndNanos: (Long, Long)) { + if (loggerThread != null) + log(id+":"+op+":"+method+":"+path+":"+ + Math.round(countAndNanos._2/1000)+ + (if (countAndNanos._1 > 1) ":"+countAndNanos._1 else "")); + } +// def apply(state: RequestState, op: String, nanos: long) { +// apply(state.requestId, op, state.req.getMethod(), state.req.getRequestURI(), nanos); +// } + + def time = + System.nanoTime(); + + // thread-specific stuff. + val map = new ThreadLocal[HashMap[String, Any]] { + override def initialValue = new HashMap[String, Any]; + } + val idGen = new java.util.concurrent.atomic.AtomicLong(0); + val id = new ThreadLocal[Long] { + override def initialValue = idGen.getAndIncrement(); + } + def reset() = { + map.remove(); + id.remove(); + } + + def record(key: String, time: Long) { + map.get()(key) = (1L, time); + } + def recordCumulative(key: String, time: Long) { + map.get()(key) = map.get().getOrElse(key, (0L, 0L)) match { + case (count: Long, time0: Long) => (count+1, time0+time); + case _ => { } // do nothing, but maybe shoud error. + } + } + def print() { + for ((k, t) <- map.get()) { + profiler(""+id.get(), k, "/", "/", t match { + case (count: Long, time0: Long) => (count, time0); + case _ => (-1L, -1L); + }); + } + } + + def printTiming[E](name: String)(block: => E): E = { + val startTime = time; + val r = block; + val endTime = time; + println(name+": "+((endTime - startTime)/1000)+" us."); + r; + } +} + +object eventlog extends GenericLogger("backend", "server-events", true) { + start(); +} + +object streaminglog extends GenericLogger("backend", "streaming-events", true) { + start(); +} + +object exceptionlog extends GenericLogger("backend", "exceptions", true) { + def apply(e: Throwable) { + val s = new StringWriter; + e.printStackTrace(new PrintWriter(s)); + log(Map( + "description" -> e.toString(), + "trace" -> s.toString())); + } + + echoToStdOut = config.devMode + override def stdOutPrefix = "(exlog): "; + + start(); +} + +// object dprintln extends GenericLogger("backend", "debug", true) { +// echoToStdOut = config.devMode; +// } + +class STFULogger extends org.mortbay.log.Logger { + def debug(m: String, a0: Object, a1: Object) { } + def debug(m: String, t: Throwable) { } + def getLogger(m: String) = { this } + def info(m: String, a0: Object, a2: Object) { } + def isDebugEnabled() = { false } + def setDebugEnabled(t: Boolean) { } + def warn(m: String, a0: Object, a1: Object) { } + def warn(m: String, t: Throwable) { } +} + +case class Percentile(count: Int, p50: Int, p90: Int, p95: Int, p99: Int, max: Int); + +object cometlatencies { + var latencies = new java.util.concurrent.ConcurrentLinkedQueue[Int]; + def register(t: Int) = latencies.offer(t); + + var loggerThread: Thread = null; + var lastCount: Option[Map[String, Int]] = None; + var lastStats: Option[Percentile] = None; + def start() { + loggerThread = new Thread("latencies logger") { + this.setDaemon(true); + override def run() { + while(true) { + Thread.sleep(60*1000); // every minute + try { + val oldLatencies = latencies; + latencies = new java.util.concurrent.ConcurrentLinkedQueue[Int]; + val latArray = oldLatencies.toArray().map(_.asInstanceOf[int]); + Sorting.quickSort(latArray); + def pct(p: Int) = + if (latArray.length > 0) + latArray(Math.floor((p/100.0)*latArray.length).toInt); + else + 0; + def s(a: Any) = String.valueOf(a); + lastStats = Some(Percentile(latArray.length, + pct(50), pct(90), pct(95), pct(99), + if (latArray.length > 0) latArray.last else 0)); + eventlog.log(Map( + "type" -> "streaming-message-latencies", + "count" -> s(lastStats.get.count), + "p50" -> s(lastStats.get.p50), + "p90" -> s(lastStats.get.p90), + "p95" -> s(lastStats.get.p95), + "p99" -> s(lastStats.get.p99), + "max" -> s(lastStats.get.max))); + lastCount = Some({ + val c = Class.forName("net.appjet.ajstdlib.Comet$"); + c.getDeclaredMethod("connectionStatus") + .invoke(c.getDeclaredField("MODULE$").get(null)) + }.asInstanceOf[Map[String, Int]]); + eventlog.log( + Map("type" -> "streaming-connection-count") ++ + lastCount.get.elements.map(p => (p._1, String.valueOf(p._2)))); + } catch { + case e: Exception => { + exceptionlog(e); + } + } + } + } + } + loggerThread.start(); + } + + start(); +} + +object executionlatencies extends GenericLogger("backend", "latency", true) { + start(); + + def time = System.currentTimeMillis(); +} + +abstract class LogWrangler { + def tell(lpb: LoggablePropertyBag); + def tell(json: String) { tell(new LoggableFromJson(json)); } + lazy val ref = new WeakReference(this); + + def watch(logName: String) { + GenericLoggerUtils.registerWrangler(logName, this); + } +} + +// you probably want to subclass this, or at least set data. +class FilterWrangler( + `type`: String, + filter: LoggablePropertyBag => Boolean, + field: String) extends LogWrangler { + def tell(lpb: LoggablePropertyBag) { + if ((`type` == null || lpb.`type` == `type`) && + (filter == null || filter(lpb))) { + val entry = lpb.value(field); + data(lpb.date, entry); + } + } + var data: (Date, Any) => Unit = null; + def setData(data0: (Date, Any) => Unit) { + data = data0; + } +} + +class TopNWrangler(n: Int, `type`: String, + filter: LoggablePropertyBag => Boolean, + field: String) + extends FilterWrangler(`type`, filter, field) { + val entries = new ConcurrentHashMap[String, AtomicInteger](); + def sortedEntries = { + Sorting.stableSort( + convertMap(entries).toSeq, + (p1: (String, AtomicInteger), p2: (String, AtomicInteger)) => + p1._2.get() > p2._2.get()); + } + def count = { + (convertMap(entries) :\ 0) { (x, y) => x._2.get() + y } + } + + def topNItems(n: Int): Array[(String, Int)] = + sortedEntries.take(n).map(p => (p._1, p._2.get())).toArray; + def topNItems: Array[(String, Int)] = topNItems(n); + + data = (date: Date, value: Any) => { + val entry = value.asInstanceOf[String]; + val i = + if (! entries.containsKey(entry)) { + val newInt = new AtomicInteger(0); + val oldInt = entries.putIfAbsent(entry, newInt); + if (oldInt == null) { newInt } else { oldInt } + } else { + entries.get(entry); + } + i.incrementAndGet(); + } +} -- cgit v1.2.3-1-g7c22