/** * Copyright 2009 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS-IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package net.appjet.oui; import java.util.Date; import scala.collection.mutable.{HashMap, HashSet, Set, Map, ArrayBuffer}; import scala.util.Sorting; trait BucketMap extends scala.collection.mutable.Map[int, BucketedLastHits] { def t = 1000*60; override def apply(s: int) = synchronized { getOrElseUpdate(s, new BucketedLastHits(t)) }; def counts = { val p = this; new scala.collection.Map.Projection[int, int] { def size = p.size; def get(s: int) = p.get(s).map(_.count); def elements = p.elements.map(o => (o._1, o._2.count)); }}; } abstract class BucketKeeper[A, B](val size: Long, val numbuckets: int, val noUpdate: Boolean) { def this(size: Long, noUpdate: Boolean) = this(size, Math.max(100, if (noUpdate) 1 else (size/60000).toInt), noUpdate) def this(size: Long) = this(size, false); val buckets = new Array[A](numbuckets); val millisPerBucket = size/numbuckets; var lastSwitch = System.currentTimeMillis(); var currentBucket = 0; def withSyncUpdate[E](block: E): E = synchronized { updateBuckets(); block; } protected def bucketAtTime(d: Date) = { val msAgo = lastSwitch - d.getTime(); val bucketsAgo = Math.floor(msAgo/millisPerBucket).asInstanceOf[Int]; if (bucketsAgo < numbuckets) { val bucket = (currentBucket - bucketsAgo + numbuckets) % numbuckets // println("Applying to old bucket: "+bucket+" / current: "+currentBucket+", old count: "+count); Some(bucket); } else { // println("No bucket found for: "+d); None; } } protected def updateBuckets(): Unit = { if (! noUpdate) { val now = System.currentTimeMillis(); while (now > lastSwitch + millisPerBucket) { lastSwitch += millisPerBucket; currentBucket = (currentBucket + 1) % numbuckets; bucketClear(currentBucket); } } } protected def bucketClear(index: Int); protected def bucketsInOrder: Seq[A] = buckets.slice((currentBucket+1)%numbuckets, numbuckets) ++ buckets.slice(0, currentBucket) def mergeBuckets(b: Seq[A]): B; def history(bucketsPerSample: Int, numSamples: Int): Array[B] = withSyncUpdate { val bseq = bucketsInOrder.reverse.take(bucketsPerSample*numSamples); val sampleCount = Math.min(numSamples, bseq.length); val samples = for (i <- 0 until sampleCount) yield { mergeBuckets(bseq.slice(i*bucketsPerSample, (i+1)*bucketsPerSample)); } samples.reverse.toArray; } def latest(bucketsPerSample: Int): B = history(bucketsPerSample, 1)(0); def count: B = withSyncUpdate { mergeBuckets(buckets); } for (i <- 0 until numbuckets) { bucketClear(i); } } class BucketedUniques(size: Long, noUpdate: Boolean) extends BucketKeeper[Set[Any], Int](size, noUpdate) { def this(size: Long) = this(size, false); override protected def bucketClear(index: Int): Unit = { buckets(index) = new HashSet[Any]; } override def mergeBuckets(b: Seq[Set[Any]]) = { b.foldLeft(scala.collection.immutable.Set[Any]())(_ ++ _).size; } def hit(d: Date, value: Any): Unit = withSyncUpdate { for (bucket <- bucketAtTime(d)) { buckets(bucket) += value; } } } class BucketedValueCounts(size: Long, noUpdate: Boolean) extends BucketKeeper[HashMap[String, Int], (Int, Map[String, Int])](size, noUpdate) { def this(size: Long) = this(size, false); override protected def bucketClear(index: Int): Unit = { buckets(index) = new HashMap[String, Int]; } override def mergeBuckets(b: Seq[HashMap[String, Int]]) = { val out = new HashMap[String, Int]; var total = 0; for (m <- b) { for ((k, v) <- m) { out(k) = out.getOrElse(k, 0) + v; total += v; } } (total, out); } def hit(d: Date, value: String, increment: Int): Unit = withSyncUpdate { for (bucket <- bucketAtTime(d)) { buckets(bucket)(value) = buckets(bucket).getOrElse(value, 0)+increment; } } def hit(d: Date, value: String): Unit = hit(d, value, 1); } /** * Keeps track of how many "hits" in the last size milliseconds. * Has granularity speicified by numbuckets. */ class BucketedLastHits(size: Long, noUpdate: Boolean) extends BucketKeeper[Int, Int](size, noUpdate) { def this(size: Long) = this(size, false); override protected def bucketClear(index: int): Unit = { buckets(index) = 0; } override def mergeBuckets(b: Seq[Int]) = { b.foldRight(0)(_+_); } def hit(d: Date): Unit = hit(d, 1); def hit(d: Date, n: Int): Unit = withSyncUpdate { for (bucket <- bucketAtTime(d)) { buckets(bucket) = buckets(bucket) + n; } } } class BucketedLastHitsHistogram(size: Long, noUpdate: Boolean) extends BucketKeeper[ArrayBuffer[Int], Function1[Float, Int]](size, noUpdate) { def this(size: Long) = this(size, false); override protected def bucketClear(index: Int): Unit = { buckets(index) = new ArrayBuffer[Int]; } // elements will end up sorted. protected def histogramFunction(elements: Array[Int]): Function1[Float, Int] = { Sorting.quickSort(elements); (percentile: Float) => { if (elements.length == 0) { 0 } else { elements( Math.round(percentile/100.0f*(elements.length-1))); } } } override def mergeBuckets(b: Seq[ArrayBuffer[Int]]) = { val elements = new Array[Int](b.foldRight(0)(_.size + _)); var currentIndex = 0; for (bucket <- b if bucket.length > 0) { // copyToArray is broken through scala 2.7.5, fixed in trunk. // bucket.copyToArray(allElements, currentIndex); val bucketArray = bucket.toArray; System.arraycopy(bucketArray, 0, elements, currentIndex, bucketArray.length); currentIndex += bucket.size } histogramFunction(elements); } def hit(d: Date): Unit = hit(d, 1); def hit(d: Date, n: Int): Unit = withSyncUpdate { for (bucket <- bucketAtTime(d)) { buckets(bucket) += n; } } } object appstats { val minutelyStatus = new HashMap[int, BucketedLastHits] with BucketMap; val hourlyStatus = new HashMap[int, BucketedLastHits] with BucketMap { override val t = 1000*60*60 }; val dailyStatus = new HashMap[int, BucketedLastHits] with BucketMap { override val t = 1000*60*60*24 }; val weeklyStatus = new HashMap[int, BucketedLastHits] with BucketMap { override val t = 1000*60*60*24*7 }; val stati = Array(minutelyStatus, hourlyStatus, dailyStatus, weeklyStatus); }