Revert "Add workers for history to leverage CPU intensive work loading"
This reverts commit b5920fbbd1ceb595456da18f7d458b63d1a960bf.
This commit is contained in:
		
							parent
							
								
									c5de552988
								
							
						
					
					
						commit
						7adb78aba8
					
				| @ -2,7 +2,6 @@ | |||||||
| //external modules
 | //external modules
 | ||||||
| var async = require('async'); | var async = require('async'); | ||||||
| var moment = require('moment'); | var moment = require('moment'); | ||||||
| var childProcess = require('child_process'); |  | ||||||
| 
 | 
 | ||||||
| //core
 | //core
 | ||||||
| var config = require("./config.js"); | var config = require("./config.js"); | ||||||
| @ -10,9 +9,6 @@ var logger = require("./logger.js"); | |||||||
| var response = require("./response.js"); | var response = require("./response.js"); | ||||||
| var models = require("./models"); | var models = require("./models"); | ||||||
| 
 | 
 | ||||||
| // workers
 |  | ||||||
| var historyUpdater = require("./workers/historyUpdater"); |  | ||||||
| 
 |  | ||||||
| //public
 | //public
 | ||||||
| var History = { | var History = { | ||||||
|     historyGet: historyGet, |     historyGet: historyGet, | ||||||
| @ -24,51 +20,50 @@ var History = { | |||||||
| 
 | 
 | ||||||
| var caches = {}; | var caches = {}; | ||||||
| //update when the history is dirty
 | //update when the history is dirty
 | ||||||
| var updaterIsBusy = false; |  | ||||||
| var updater = setInterval(function () { | var updater = setInterval(function () { | ||||||
|     if (updaterIsBusy) return; |  | ||||||
|     var deleted = []; |     var deleted = []; | ||||||
|     var _caches = {}; |     async.each(Object.keys(caches), function (key, callback) { | ||||||
|     Object.keys(caches).forEach(function (key) { |  | ||||||
|         var cache = caches[key]; |         var cache = caches[key]; | ||||||
|         if (cache.isDirty) { |         if (cache.isDirty) { | ||||||
|             _caches[key] = cache.history; |             if (config.debug) logger.info("history updater found dirty history: " + key); | ||||||
|             cache.isDirty = false; |             var history = parseHistoryToArray(cache.history); | ||||||
|  |             finishUpdateHistory(key, history, function (err, count) { | ||||||
|  |                 if (err) return callback(err, null); | ||||||
|  |                 if (!count) return callback(null, null); | ||||||
|  |                 cache.isDirty = false; | ||||||
|  |                 cache.updateAt = Date.now(); | ||||||
|  |                 return callback(null, null); | ||||||
|  |             }); | ||||||
|         } else { |         } else { | ||||||
|             if (moment().isAfter(moment(cache.updateAt).add(5, 'minutes'))) { |             if (moment().isAfter(moment(cache.updateAt).add(5, 'minutes'))) { | ||||||
|                 deleted.push(key); |                 deleted.push(key); | ||||||
|             } |             } | ||||||
|  |             return callback(null, null); | ||||||
|         } |         } | ||||||
|  |     }, function (err) { | ||||||
|  |         if (err) return logger.error('history updater error', err); | ||||||
|     }); |     }); | ||||||
|     // delete specified caches
 |     // delete specified caches
 | ||||||
|     for (var i = 0, l = deleted.length; i < l; i++) { |     for (var i = 0, l = deleted.length; i < l; i++) { | ||||||
|         caches[deleted[i]].history = {}; |         caches[deleted[i]].history = {}; | ||||||
|         delete caches[deleted[i]]; |         delete caches[deleted[i]]; | ||||||
|     } |     } | ||||||
|     if (Object.keys(_caches).length <= 0) return; |  | ||||||
|     updaterIsBusy = true; |  | ||||||
|     var worker = childProcess.fork("./lib/workers/historyUpdater.js"); |  | ||||||
|     if (config.debug) logger.info('history updater worker process started'); |  | ||||||
|     worker.send({ |  | ||||||
|         msg: 'update history', |  | ||||||
|         caches: _caches |  | ||||||
|     }); |  | ||||||
|     worker.on('message', function (data) { |  | ||||||
|         if (!data || !data.msg || !data.userid) return; |  | ||||||
|         var cache = caches[data.userid]; |  | ||||||
|         if (!cache) return; |  | ||||||
|         switch(data.msg) { |  | ||||||
|             case 'check': |  | ||||||
|                 cache.updateAt = Date.now(); |  | ||||||
|                 break; |  | ||||||
|         } |  | ||||||
|     }); |  | ||||||
|     worker.on('close', function (code) { |  | ||||||
|         updaterIsBusy = false; |  | ||||||
|         if (config.debug) logger.info('history updater worker process exited with code ' + code); |  | ||||||
|     }); |  | ||||||
| }, 1000); | }, 1000); | ||||||
| 
 | 
 | ||||||
|  | function finishUpdateHistory(userid, history, callback) { | ||||||
|  |     models.User.update({ | ||||||
|  |         history: JSON.stringify(history) | ||||||
|  |     }, { | ||||||
|  |         where: { | ||||||
|  |             id: userid | ||||||
|  |         } | ||||||
|  |     }).then(function (count) { | ||||||
|  |         return callback(null, count); | ||||||
|  |     }).catch(function (err) { | ||||||
|  |         return callback(err, null); | ||||||
|  |     }); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| function isReady() { | function isReady() { | ||||||
|     var dirtyCount = 0; |     var dirtyCount = 0; | ||||||
|     async.each(Object.keys(caches), function (key, callback) { |     async.each(Object.keys(caches), function (key, callback) { | ||||||
| @ -106,7 +101,7 @@ function getHistory(userid, callback) { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| function setHistory(userid, history) { | function setHistory(userid, history) { | ||||||
|     if (Array.isArray(history)) history = historyUpdater.parseHistoryToObject(history); |     if (Array.isArray(history)) history = parseHistoryToObject(history); | ||||||
|     if (!caches[userid]) { |     if (!caches[userid]) { | ||||||
|         caches[userid] = { |         caches[userid] = { | ||||||
|             history: {}, |             history: {}, | ||||||
| @ -135,13 +130,31 @@ function updateHistory(userid, noteId, document) { | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | function parseHistoryToArray(history) { | ||||||
|  |     var _history = []; | ||||||
|  |     Object.keys(history).forEach(function (key) { | ||||||
|  |         var item = history[key]; | ||||||
|  |         _history.push(item); | ||||||
|  |     }); | ||||||
|  |     return _history; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | function parseHistoryToObject(history) { | ||||||
|  |     var _history = {}; | ||||||
|  |     for (var i = 0, l = history.length; i < l; i++) { | ||||||
|  |         var item = history[i]; | ||||||
|  |         _history[item.id] = item; | ||||||
|  |     } | ||||||
|  |     return _history; | ||||||
|  | } | ||||||
|  | 
 | ||||||
| function historyGet(req, res) { | function historyGet(req, res) { | ||||||
|     if (req.isAuthenticated()) { |     if (req.isAuthenticated()) { | ||||||
|         getHistory(req.user.id, function (err, history) { |         getHistory(req.user.id, function (err, history) { | ||||||
|             if (err) return response.errorInternalError(res); |             if (err) return response.errorInternalError(res); | ||||||
|             if (!history) return response.errorNotFound(res); |             if (!history) return response.errorNotFound(res); | ||||||
|             res.send({ |             res.send({ | ||||||
|                 history: historyUpdater.parseHistoryToArray(history) |                 history: parseHistoryToArray(history) | ||||||
|             }); |             }); | ||||||
|         }); |         }); | ||||||
|     } else { |     } else { | ||||||
|  | |||||||
| @ -1,66 +0,0 @@ | |||||||
| // external modules
 |  | ||||||
| var async = require('async'); |  | ||||||
| 
 |  | ||||||
| // core
 |  | ||||||
| var config = require("../config.js"); |  | ||||||
| var logger = require("../logger.js"); |  | ||||||
| var models = require("../models"); |  | ||||||
| 
 |  | ||||||
| process.on('message', function (data) { |  | ||||||
|     if (!data || !data.msg || data.msg !== 'update history' || !data.caches) return process.exit(); |  | ||||||
|     var caches = data.caches; |  | ||||||
|     async.each(Object.keys(caches), function (key, callback) { |  | ||||||
|         var cache = caches[key]; |  | ||||||
|         if (config.debug) logger.info("history updater found dirty history: " + key); |  | ||||||
|         var history = parseHistoryToArray(cache); |  | ||||||
|         finishUpdateHistory(key, history, function (err, count) { |  | ||||||
|             if (err) return callback(err, null); |  | ||||||
|             if (!count) return callback(null, null); |  | ||||||
|             process.send({ |  | ||||||
|                 msg: 'check', |  | ||||||
|                 userid: key |  | ||||||
|             }); |  | ||||||
|             return callback(null, null); |  | ||||||
|         }); |  | ||||||
|     }, function (err) { |  | ||||||
|         if (err) logger.error('history updater error', err); |  | ||||||
|         process.exit(); |  | ||||||
|     }); |  | ||||||
| }); |  | ||||||
| 
 |  | ||||||
| function finishUpdateHistory(userid, history, callback) { |  | ||||||
|     models.User.update({ |  | ||||||
|         history: JSON.stringify(history) |  | ||||||
|     }, { |  | ||||||
|         where: { |  | ||||||
|             id: userid |  | ||||||
|         } |  | ||||||
|     }).then(function (count) { |  | ||||||
|         return callback(null, count); |  | ||||||
|     }).catch(function (err) { |  | ||||||
|         return callback(err, null); |  | ||||||
|     }); |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| function parseHistoryToArray(history) { |  | ||||||
|     var _history = []; |  | ||||||
|     Object.keys(history).forEach(function (key) { |  | ||||||
|         var item = history[key]; |  | ||||||
|         _history.push(item); |  | ||||||
|     }); |  | ||||||
|     return _history; |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| function parseHistoryToObject(history) { |  | ||||||
|     var _history = {}; |  | ||||||
|     for (var i = 0, l = history.length; i < l; i++) { |  | ||||||
|         var item = history[i]; |  | ||||||
|         _history[item.id] = item; |  | ||||||
|     } |  | ||||||
|     return _history; |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| module.exports = { |  | ||||||
|     parseHistoryToArray: parseHistoryToArray, |  | ||||||
|     parseHistoryToObject: parseHistoryToObject |  | ||||||
| }; |  | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user