/** * Copyright 2009 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS-IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import("execution"); import("profiler"); import("etherpad.pad.model"); import("etherpad.pad.model.accessPadGlobal"); import("etherpad.log"); import("etherpad.utils"); jimport("net.appjet.oui.exceptionlog"); jimport("java.util.concurrent.ConcurrentHashMap"); jimport("java.lang.System.out.println"); var MIN_WRITE_INTERVAL_MS = 2000; // 2 seconds var MIN_WRITE_DELAY_NOTIFY_MS = 2000; // 2 seconds var AGE_FOR_PAD_FLUSH_MS = 5*60*1000; // 5 minutes var DBUNWRITABLE_WRITE_DELAY_MS = 30*1000; // 30 seconds // state is { constant: true }, { constant: false }, { trueAfter: timeInMs } function setWritableState(state) { _dbwriter().dbWritable = state; } function getWritableState() { return _dbwriter().dbWritable; } function isDBWritable() { return _isDBWritable(); } function _isDBWritable() { var state = _dbwriter().dbWritable; if (typeof state != "object") { return true; } else if (state.constant !== undefined) { return !! state.constant; } else if (state.trueAfter !== undefined) { return (+new Date()) > state.trueAfter; } else return true; } function getWritableStateDescription(state) { var v = _isDBWritable(); var restOfMessage = ""; if (state.trueAfter !== undefined) { var now = +new Date(); var then = state.trueAfter; var diffSeconds = java.lang.String.format("%.1f", Math.abs(now - then)/1000); if (now < then) { restOfMessage = " until "+diffSeconds+" seconds from now"; } else { restOfMessage = " since "+diffSeconds+" seconds ago"; } } return v+restOfMessage; } function _dbwriter() { return appjet.cache.dbwriter; } function onStartup() { appjet.cache.dbwriter = {}; var dbwriter = _dbwriter(); dbwriter.pendingWrites = new ConcurrentHashMap(); dbwriter.scheduledFor = new ConcurrentHashMap(); // padId --> long dbwriter.dbWritable = { constant: true }; execution.initTaskThreadPool("dbwriter", 4); // we don't wait for scheduled tasks in the infreq pool to run and complete execution.initTaskThreadPool("dbwriter_infreq", 1); _scheduleCheckForStalePads(); } function _scheduleCheckForStalePads() { execution.scheduleTask("dbwriter_infreq", "checkForStalePads", AGE_FOR_PAD_FLUSH_MS, []); } function onShutdown() { log.info("Doing final DB writes before shutdown..."); var success = execution.shutdownAndWaitOnTaskThreadPool("dbwriter", 10000); if (! success) { log.warn("ERROR! DB WRITER COULD NOT SHUTDOWN THREAD POOL!"); } } function _logException(e) { var exc = utils.toJavaException(e); log.warn("writeAllToDB: Error writing to SQL! Written to exceptions.log: "+exc); log.logException(exc); exceptionlog.apply(exc); } function taskFlushPad(padId, reason) { var dbwriter = _dbwriter(); if (! _isDBWritable()) { // DB is unwritable, delay execution.scheduleTask("dbwriter_infreq", "flushPad", DBUNWRITABLE_WRITE_DELAY_MS, [padId, reason]); return; } model.accessPadGlobal(padId, function(pad) { writePadNow(pad, true); }, "r"); log.info("taskFlushPad: flushed "+padId+(reason?(" (reason: "+reason+")"):'')); } function taskWritePad(padId) { var dbwriter = _dbwriter(); if (! _isDBWritable()) { // DB is unwritable, delay dbwriter.scheduledFor.put(padId, (+(new Date)+DBUNWRITABLE_WRITE_DELAY_MS)); execution.scheduleTask("dbwriter", "writePad", DBUNWRITABLE_WRITE_DELAY_MS, [padId]); return; } profiler.reset(); var t1 = profiler.rcb("lock wait"); model.accessPadGlobal(padId, function(pad) { t1(); _dbwriter().pendingWrites.remove(padId); // do this first var success = false; try { var t2 = profiler.rcb("write"); writePadNow(pad); t2(); success = true; } finally { if (! success) { log.warn("DB WRITER FAILED TO WRITE PAD: "+padId); } profiler.print(); } }, "r"); } function taskCheckForStalePads() { // do this first _scheduleCheckForStalePads(); if (! _isDBWritable()) return; // get "active" pads into an array var padIter = appjet.cache.pads.meta.keySet().iterator(); var padList = []; while (padIter.hasNext()) { padList.push(padIter.next()); } var numStale = 0; for (var i = 0; i < padList.length; i++) { if (! _isDBWritable()) break; var p = padList[i]; if (model.isPadLockHeld(p)) { // skip it, don't want to lock up stale pad flusher } else { accessPadGlobal(p, function(pad) { if (pad.exists()) { var padAge = (+new Date()) - pad._meta.status.lastAccess; if (padAge > AGE_FOR_PAD_FLUSH_MS) { writePadNow(pad, true); numStale++; } } }, "r"); } } log.info("taskCheckForStalePads: flushed "+numStale+" stale pads"); } function notifyPadDirty(padId) { var dbwriter = _dbwriter(); if (! dbwriter.pendingWrites.containsKey(padId)) { dbwriter.pendingWrites.put(padId, "pending"); dbwriter.scheduledFor.put(padId, (+(new Date)+MIN_WRITE_INTERVAL_MS)); execution.scheduleTask("dbwriter", "writePad", MIN_WRITE_INTERVAL_MS, [padId]); } } function scheduleFlushPad(padId, reason) { execution.scheduleTask("dbwriter_infreq", "flushPad", 0, [padId, reason]); } /*function _dbwriterLoopBody(executor) { try { var info = writeAllToDB(executor); if (!info.boring) { log.info("DB writer: "+info.toSource()); } java.lang.Thread.sleep(Math.max(0, MIN_WRITE_INTERVAL_MS - info.elapsed)); } catch (e) { _logException(e); java.lang.Thread.sleep(MIN_WRITE_INTERVAL_MS); } } function _startInThread(name, func) { (new Thread(new Runnable({ run: function() { func(); } }), name)).start(); } function killDBWriterThreadAndWait() { appjet.cache.abortDBWriter = true; while (appjet.cache.runningDBWriter) { java.lang.Thread.sleep(100); } }*/ /*function writeAllToDB(executor, andFlush) { if (!executor) { executor = new ScheduledThreadPoolExecutor(NUM_WRITER_THREADS); } profiler.reset(); var startWriteTime = profiler.time(); var padCount = new AtomicInteger(0); var writeCount = new AtomicInteger(0); var removeCount = new AtomicInteger(0); // get pads into an array var padIter = appjet.cache.pads.meta.keySet().iterator(); var padList = []; while (padIter.hasNext()) { padList.push(padIter.next()); } var latch = new CountDownLatch(padList.length); for (var i = 0; i < padList.length; i++) { _spawnCall(executor, function(p) { try { var padWriteResult = {}; accessPadGlobal(p, function(pad) { if (pad.exists()) { padCount.getAndIncrement(); padWriteResult = writePad(pad, andFlush); if (padWriteResult.didWrite) writeCount.getAndIncrement(); if (padWriteResult.didRemove) removeCount.getAndIncrement(); } }, "r"); } catch (e) { _logException(e); } finally { latch.countDown(); } }, padList[i]); } // wait for them all to finish latch.await(); var endWriteTime = profiler.time(); var elapsed = Math.round((endWriteTime - startWriteTime)/1000)/1000; var interesting = (writeCount.get() > 0 || removeCount.get() > 0); var obj = {padCount:padCount.get(), writeCount:writeCount.get(), elapsed:elapsed, removeCount:removeCount.get()}; if (! interesting) obj.boring = true; if (interesting) { profiler.record("writeAll", profiler.time()-startWriteTime); profiler.print(); } return obj; }*/ function writePadNow(pad, andFlush) { var didWrite = false; var didRemove = false; if (pad.exists()) { var dbUpToDate = false; if (pad._meta.status.dirty) { /*log.info("Writing pad "+pad.getId());*/ pad._meta.status.dirty = false; //var t1 = +new Date(); pad.writeToDB(); //var t2 = +new Date(); didWrite = true; //log.info("Wrote pad "+pad.getId()+" in "+(t2-t1)+" ms."); var now = +(new Date); var sched = _dbwriter().scheduledFor.get(pad.getId()); if (sched) { var delay = now - sched; if (delay > MIN_WRITE_DELAY_NOTIFY_MS) { log.warn("dbwriter["+pad.getId()+"] behind schedule by "+delay+"ms"); } _dbwriter().scheduledFor.remove(pad.getId()); } } if (andFlush) { // remove from cache model.removeFromMemory(pad); didRemove = true; } } return {didWrite:didWrite, didRemove:didRemove}; } /*function _spawnCall(executor, func, varargs) { var args = Array.prototype.slice.call(arguments, 2); var that = this; executor.schedule(new Runnable({ run: function() { func.apply(that, args); } }), 0, TimeUnit.MICROSECONDS); }*/