diff options
Diffstat (limited to 'trunk/etherpad/src/etherpad/pad/dbwriter.js')
-rw-r--r-- | trunk/etherpad/src/etherpad/pad/dbwriter.js | 338 |
1 files changed, 338 insertions, 0 deletions
diff --git a/trunk/etherpad/src/etherpad/pad/dbwriter.js b/trunk/etherpad/src/etherpad/pad/dbwriter.js new file mode 100644 index 0000000..233622b --- /dev/null +++ b/trunk/etherpad/src/etherpad/pad/dbwriter.js @@ -0,0 +1,338 @@ +/** + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS-IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import("execution"); +import("profiler"); + +import("etherpad.pad.model"); +import("etherpad.pad.model.accessPadGlobal"); +import("etherpad.log"); +import("etherpad.utils"); + +jimport("net.appjet.oui.exceptionlog"); +jimport("java.util.concurrent.ConcurrentHashMap"); +jimport("java.lang.System.out.println"); + +var MIN_WRITE_INTERVAL_MS = 2000; // 2 seconds +var MIN_WRITE_DELAY_NOTIFY_MS = 2000; // 2 seconds +var AGE_FOR_PAD_FLUSH_MS = 5*60*1000; // 5 minutes +var DBUNWRITABLE_WRITE_DELAY_MS = 30*1000; // 30 seconds + +// state is { constant: true }, { constant: false }, { trueAfter: timeInMs } +function setWritableState(state) { + _dbwriter().dbWritable = state; +} + +function getWritableState() { + return _dbwriter().dbWritable; +} + +function isDBWritable() { + return _isDBWritable(); +} + +function _isDBWritable() { + var state = _dbwriter().dbWritable; + if (typeof state != "object") { + return true; + } + else if (state.constant !== undefined) { + return !! state.constant; + } + else if (state.trueAfter !== undefined) { + return (+new Date()) > state.trueAfter; + } + else return true; +} + +function getWritableStateDescription(state) { + var v = _isDBWritable(); + var restOfMessage = ""; + if (state.trueAfter !== undefined) { + var now = +new Date(); + var then = state.trueAfter; + var diffSeconds = java.lang.String.format("%.1f", Math.abs(now - then)/1000); + if (now < then) { + restOfMessage = " until "+diffSeconds+" seconds from now"; + } + else { + restOfMessage = " since "+diffSeconds+" seconds ago"; + } + } + return v+restOfMessage; +} + +function _dbwriter() { + return appjet.cache.dbwriter; +} + +function onStartup() { + appjet.cache.dbwriter = {}; + var dbwriter = _dbwriter(); + dbwriter.pendingWrites = new ConcurrentHashMap(); + dbwriter.scheduledFor = new ConcurrentHashMap(); // padId --> long + dbwriter.dbWritable = { constant: true }; + + execution.initTaskThreadPool("dbwriter", 4); + // we don't wait for scheduled tasks in the infreq pool to run and complete + execution.initTaskThreadPool("dbwriter_infreq", 1); + + _scheduleCheckForStalePads(); +} + +function _scheduleCheckForStalePads() { + execution.scheduleTask("dbwriter_infreq", "checkForStalePads", AGE_FOR_PAD_FLUSH_MS, []); +} + +function onShutdown() { + log.info("Doing final DB writes before shutdown..."); + var success = execution.shutdownAndWaitOnTaskThreadPool("dbwriter", 10000); + if (! success) { + log.warn("ERROR! DB WRITER COULD NOT SHUTDOWN THREAD POOL!"); + } +} + +function _logException(e) { + var exc = utils.toJavaException(e); + log.warn("writeAllToDB: Error writing to SQL! Written to exceptions.log: "+exc); + log.logException(exc); + exceptionlog.apply(exc); +} + +function taskFlushPad(padId, reason) { + var dbwriter = _dbwriter(); + if (! _isDBWritable()) { + // DB is unwritable, delay + execution.scheduleTask("dbwriter_infreq", "flushPad", DBUNWRITABLE_WRITE_DELAY_MS, [padId, reason]); + return; + } + + model.accessPadGlobal(padId, function(pad) { + writePadNow(pad, true); + }, "r"); + + log.info("taskFlushPad: flushed "+padId+(reason?(" (reason: "+reason+")"):'')); +} + +function taskWritePad(padId) { + var dbwriter = _dbwriter(); + if (! _isDBWritable()) { + // DB is unwritable, delay + dbwriter.scheduledFor.put(padId, (+(new Date)+DBUNWRITABLE_WRITE_DELAY_MS)); + execution.scheduleTask("dbwriter", "writePad", DBUNWRITABLE_WRITE_DELAY_MS, [padId]); + return; + } + + profiler.reset(); + var t1 = profiler.rcb("lock wait"); + model.accessPadGlobal(padId, function(pad) { + t1(); + _dbwriter().pendingWrites.remove(padId); // do this first + + var success = false; + try { + var t2 = profiler.rcb("write"); + writePadNow(pad); + t2(); + + success = true; + } + finally { + if (! success) { + log.warn("DB WRITER FAILED TO WRITE PAD: "+padId); + } + profiler.print(); + } + }, "r"); +} + +function taskCheckForStalePads() { + // do this first + _scheduleCheckForStalePads(); + + if (! _isDBWritable()) return; + + // get "active" pads into an array + var padIter = appjet.cache.pads.meta.keySet().iterator(); + var padList = []; + while (padIter.hasNext()) { padList.push(padIter.next()); } + + var numStale = 0; + + for (var i = 0; i < padList.length; i++) { + if (! _isDBWritable()) break; + var p = padList[i]; + if (model.isPadLockHeld(p)) { + // skip it, don't want to lock up stale pad flusher + } + else { + accessPadGlobal(p, function(pad) { + if (pad.exists()) { + var padAge = (+new Date()) - pad._meta.status.lastAccess; + if (padAge > AGE_FOR_PAD_FLUSH_MS) { + writePadNow(pad, true); + numStale++; + } + } + }, "r"); + } + } + + log.info("taskCheckForStalePads: flushed "+numStale+" stale pads"); +} + +function notifyPadDirty(padId) { + var dbwriter = _dbwriter(); + if (! dbwriter.pendingWrites.containsKey(padId)) { + dbwriter.pendingWrites.put(padId, "pending"); + dbwriter.scheduledFor.put(padId, (+(new Date)+MIN_WRITE_INTERVAL_MS)); + execution.scheduleTask("dbwriter", "writePad", MIN_WRITE_INTERVAL_MS, [padId]); + } +} + +function scheduleFlushPad(padId, reason) { + execution.scheduleTask("dbwriter_infreq", "flushPad", 0, [padId, reason]); +} + +/*function _dbwriterLoopBody(executor) { + try { + var info = writeAllToDB(executor); + if (!info.boring) { + log.info("DB writer: "+info.toSource()); + } + java.lang.Thread.sleep(Math.max(0, MIN_WRITE_INTERVAL_MS - info.elapsed)); + } + catch (e) { + _logException(e); + java.lang.Thread.sleep(MIN_WRITE_INTERVAL_MS); + } +} + +function _startInThread(name, func) { + (new Thread(new Runnable({ + run: function() { + func(); + } + }), name)).start(); +} + +function killDBWriterThreadAndWait() { + appjet.cache.abortDBWriter = true; + while (appjet.cache.runningDBWriter) { + java.lang.Thread.sleep(100); + } +}*/ + +/*function writeAllToDB(executor, andFlush) { + if (!executor) { + executor = new ScheduledThreadPoolExecutor(NUM_WRITER_THREADS); + } + + profiler.reset(); + var startWriteTime = profiler.time(); + var padCount = new AtomicInteger(0); + var writeCount = new AtomicInteger(0); + var removeCount = new AtomicInteger(0); + + // get pads into an array + var padIter = appjet.cache.pads.meta.keySet().iterator(); + var padList = []; + while (padIter.hasNext()) { padList.push(padIter.next()); } + + var latch = new CountDownLatch(padList.length); + + for (var i = 0; i < padList.length; i++) { + _spawnCall(executor, function(p) { + try { + var padWriteResult = {}; + accessPadGlobal(p, function(pad) { + if (pad.exists()) { + padCount.getAndIncrement(); + padWriteResult = writePad(pad, andFlush); + if (padWriteResult.didWrite) writeCount.getAndIncrement(); + if (padWriteResult.didRemove) removeCount.getAndIncrement(); + } + }, "r"); + } catch (e) { + _logException(e); + } finally { + latch.countDown(); + } + }, padList[i]); + } + + // wait for them all to finish + latch.await(); + + var endWriteTime = profiler.time(); + var elapsed = Math.round((endWriteTime - startWriteTime)/1000)/1000; + var interesting = (writeCount.get() > 0 || removeCount.get() > 0); + + var obj = {padCount:padCount.get(), writeCount:writeCount.get(), elapsed:elapsed, removeCount:removeCount.get()}; + if (! interesting) obj.boring = true; + if (interesting) { + profiler.record("writeAll", profiler.time()-startWriteTime); + profiler.print(); + } + + return obj; +}*/ + +function writePadNow(pad, andFlush) { + var didWrite = false; + var didRemove = false; + + if (pad.exists()) { + var dbUpToDate = false; + if (pad._meta.status.dirty) { + /*log.info("Writing pad "+pad.getId());*/ + pad._meta.status.dirty = false; + //var t1 = +new Date(); + pad.writeToDB(); + //var t2 = +new Date(); + didWrite = true; + + //log.info("Wrote pad "+pad.getId()+" in "+(t2-t1)+" ms."); + + var now = +(new Date); + var sched = _dbwriter().scheduledFor.get(pad.getId()); + if (sched) { + var delay = now - sched; + if (delay > MIN_WRITE_DELAY_NOTIFY_MS) { + log.warn("dbwriter["+pad.getId()+"] behind schedule by "+delay+"ms"); + } + _dbwriter().scheduledFor.remove(pad.getId()); + } + } + if (andFlush) { + // remove from cache + model.removeFromMemory(pad); + didRemove = true; + } + } + return {didWrite:didWrite, didRemove:didRemove}; +} + +/*function _spawnCall(executor, func, varargs) { + var args = Array.prototype.slice.call(arguments, 2); + var that = this; + executor.schedule(new Runnable({ + run: function() { + func.apply(that, args); + } + }), 0, TimeUnit.MICROSECONDS); +}*/ + |