Add workers for history to leverage CPU intensive work loading
This commit is contained in:
		
							parent
							
								
									4ccfdfa538
								
							
						
					
					
						commit
						b5920fbbd1
					
				| @ -2,6 +2,7 @@ | ||||
| //external modules
 | ||||
| var async = require('async'); | ||||
| var moment = require('moment'); | ||||
| var childProcess = require('child_process'); | ||||
| 
 | ||||
| //core
 | ||||
| var config = require("./config.js"); | ||||
| @ -9,6 +10,9 @@ var logger = require("./logger.js"); | ||||
| var response = require("./response.js"); | ||||
| var models = require("./models"); | ||||
| 
 | ||||
| // workers
 | ||||
| var historyUpdater = require("./workers/historyUpdater"); | ||||
| 
 | ||||
| //public
 | ||||
| var History = { | ||||
|     historyGet: historyGet, | ||||
| @ -20,49 +24,50 @@ var History = { | ||||
| 
 | ||||
| var caches = {}; | ||||
| //update when the history is dirty
 | ||||
| var updaterIsBusy = false; | ||||
| var updater = setInterval(function () { | ||||
|     if (updaterIsBusy) return; | ||||
|     var deleted = []; | ||||
|     async.each(Object.keys(caches), function (key, callback) { | ||||
|     var _caches = {}; | ||||
|     Object.keys(caches).forEach(function (key) { | ||||
|         var cache = caches[key]; | ||||
|         if (cache.isDirty) { | ||||
|             if (config.debug) logger.info("history updater found dirty history: " + key); | ||||
|             var history = parseHistoryToArray(cache.history); | ||||
|             finishUpdateHistory(key, history, function (err, count) { | ||||
|                 if (err) return callback(err, null); | ||||
|                 if (!count) return callback(null, null); | ||||
|                 cache.isDirty = false; | ||||
|                 cache.updateAt = Date.now(); | ||||
|                 return callback(null, null); | ||||
|             }); | ||||
|             _caches[key] = cache.history; | ||||
|             cache.isDirty = false; | ||||
|         } else { | ||||
|             if (moment().isAfter(moment(cache.updateAt).add(5, 'minutes'))) { | ||||
|                 deleted.push(key); | ||||
|             } | ||||
|             return callback(null, null); | ||||
|         } | ||||
|     }, function (err) { | ||||
|         if (err) return logger.error('history updater error', err); | ||||
|     }); | ||||
|     // delete specified caches
 | ||||
|     for (var i = 0, l = deleted.length; i < l; i++) { | ||||
|         caches[deleted[i]].history = {}; | ||||
|         delete caches[deleted[i]]; | ||||
|     } | ||||
| }, 1000); | ||||
| 
 | ||||
| function finishUpdateHistory(userid, history, callback) { | ||||
|     models.User.update({ | ||||
|         history: JSON.stringify(history) | ||||
|     }, { | ||||
|         where: { | ||||
|             id: userid | ||||
|         } | ||||
|     }).then(function (count) { | ||||
|         return callback(null, count); | ||||
|     }).catch(function (err) { | ||||
|         return callback(err, null); | ||||
|     if (Object.keys(_caches).length <= 0) return; | ||||
|     updaterIsBusy = true; | ||||
|     var worker = childProcess.fork("./lib/workers/historyUpdater.js"); | ||||
|     if (config.debug) logger.info('history updater worker process started'); | ||||
|     worker.send({ | ||||
|         msg: 'update history', | ||||
|         caches: _caches | ||||
|     }); | ||||
| } | ||||
|     worker.on('message', function (data) { | ||||
|         if (!data || !data.msg || !data.userid) return; | ||||
|         var cache = caches[data.userid]; | ||||
|         if (!cache) return; | ||||
|         switch(data.msg) { | ||||
|             case 'check': | ||||
|                 cache.updateAt = Date.now(); | ||||
|                 break; | ||||
|         } | ||||
|     }); | ||||
|     worker.on('close', function (code) { | ||||
|         updaterIsBusy = false; | ||||
|         if (config.debug) logger.info('history updater worker process exited with code ' + code); | ||||
|     }); | ||||
| }, 1000); | ||||
| 
 | ||||
| function isReady() { | ||||
|     var dirtyCount = 0; | ||||
| @ -101,7 +106,7 @@ function getHistory(userid, callback) { | ||||
| } | ||||
| 
 | ||||
| function setHistory(userid, history) { | ||||
|     if (Array.isArray(history)) history = parseHistoryToObject(history); | ||||
|     if (Array.isArray(history)) history = historyUpdater.parseHistoryToObject(history); | ||||
|     if (!caches[userid]) { | ||||
|         caches[userid] = { | ||||
|             history: {}, | ||||
| @ -130,31 +135,13 @@ function updateHistory(userid, noteId, document) { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| function parseHistoryToArray(history) { | ||||
|     var _history = []; | ||||
|     Object.keys(history).forEach(function (key) { | ||||
|         var item = history[key]; | ||||
|         _history.push(item); | ||||
|     }); | ||||
|     return _history; | ||||
| } | ||||
| 
 | ||||
| function parseHistoryToObject(history) { | ||||
|     var _history = {}; | ||||
|     for (var i = 0, l = history.length; i < l; i++) { | ||||
|         var item = history[i]; | ||||
|         _history[item.id] = item; | ||||
|     } | ||||
|     return _history; | ||||
| } | ||||
| 
 | ||||
| function historyGet(req, res) { | ||||
|     if (req.isAuthenticated()) { | ||||
|         getHistory(req.user.id, function (err, history) { | ||||
|             if (err) return response.errorInternalError(res); | ||||
|             if (!history) return response.errorNotFound(res); | ||||
|             res.send({ | ||||
|                 history: parseHistoryToArray(history) | ||||
|                 history: historyUpdater.parseHistoryToArray(history) | ||||
|             }); | ||||
|         }); | ||||
|     } else { | ||||
|  | ||||
							
								
								
									
										66
									
								
								lib/workers/historyUpdater.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										66
									
								
								lib/workers/historyUpdater.js
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,66 @@ | ||||
| // external modules
 | ||||
| var async = require('async'); | ||||
| 
 | ||||
| // core
 | ||||
| var config = require("../config.js"); | ||||
| var logger = require("../logger.js"); | ||||
| var models = require("../models"); | ||||
| 
 | ||||
| process.on('message', function (data) { | ||||
|     if (!data || !data.msg || data.msg !== 'update history' || !data.caches) return process.exit(); | ||||
|     var caches = data.caches; | ||||
|     async.each(Object.keys(caches), function (key, callback) { | ||||
|         var cache = caches[key]; | ||||
|         if (config.debug) logger.info("history updater found dirty history: " + key); | ||||
|         var history = parseHistoryToArray(cache); | ||||
|         finishUpdateHistory(key, history, function (err, count) { | ||||
|             if (err) return callback(err, null); | ||||
|             if (!count) return callback(null, null); | ||||
|             process.send({ | ||||
|                 msg: 'check', | ||||
|                 userid: key | ||||
|             }); | ||||
|             return callback(null, null); | ||||
|         }); | ||||
|     }, function (err) { | ||||
|         if (err) logger.error('history updater error', err); | ||||
|         process.exit(); | ||||
|     }); | ||||
| }); | ||||
| 
 | ||||
| function finishUpdateHistory(userid, history, callback) { | ||||
|     models.User.update({ | ||||
|         history: JSON.stringify(history) | ||||
|     }, { | ||||
|         where: { | ||||
|             id: userid | ||||
|         } | ||||
|     }).then(function (count) { | ||||
|         return callback(null, count); | ||||
|     }).catch(function (err) { | ||||
|         return callback(err, null); | ||||
|     }); | ||||
| } | ||||
| 
 | ||||
| function parseHistoryToArray(history) { | ||||
|     var _history = []; | ||||
|     Object.keys(history).forEach(function (key) { | ||||
|         var item = history[key]; | ||||
|         _history.push(item); | ||||
|     }); | ||||
|     return _history; | ||||
| } | ||||
| 
 | ||||
| function parseHistoryToObject(history) { | ||||
|     var _history = {}; | ||||
|     for (var i = 0, l = history.length; i < l; i++) { | ||||
|         var item = history[i]; | ||||
|         _history[item.id] = item; | ||||
|     } | ||||
|     return _history; | ||||
| } | ||||
| 
 | ||||
| module.exports = { | ||||
|     parseHistoryToArray: parseHistoryToArray, | ||||
|     parseHistoryToObject: parseHistoryToObject | ||||
| }; | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user