306 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			306 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| "use strict";
 | |
| 
 | |
| // external modules
 | |
| var Sequelize = require("sequelize");
 | |
| var async = require('async');
 | |
| var moment = require('moment');
 | |
| var childProcess = require('child_process');
 | |
| var shortId = require('shortid');
 | |
| 
 | |
| // core
 | |
| var config = require("../config.js");
 | |
| var logger = require("../logger.js");
 | |
| 
 | |
| var dmpWorker = createDmpWorker();
 | |
| var dmpCallbackCache = {};
 | |
| 
 | |
| function createDmpWorker() {
 | |
|     var worker = childProcess.fork("./lib/workers/dmpWorker.js", {
 | |
|         stdio: 'ignore'
 | |
|     });
 | |
|     if (config.debug) logger.info('dmp worker process started');
 | |
|     worker.on('message', function (data) {
 | |
|         if (!data || !data.msg || !data.cacheKey) {
 | |
|             return logger.error('dmp worker error: not enough data on message');
 | |
|         }
 | |
|         var cacheKey = data.cacheKey;
 | |
|         switch(data.msg) {
 | |
|             case 'error':
 | |
|                 dmpCallbackCache[cacheKey](data.error, null);
 | |
|                 break;
 | |
|             case 'check':
 | |
|                 dmpCallbackCache[cacheKey](null, data.result);
 | |
|                 break;
 | |
|         }
 | |
|         delete dmpCallbackCache[cacheKey];
 | |
|     });
 | |
|     worker.on('close', function (code) {
 | |
|         dmpWorker = null;
 | |
|         if (config.debug) logger.info('dmp worker process exited with code ' + code);
 | |
|     });
 | |
|     return worker;
 | |
| }
 | |
| 
 | |
| function sendDmpWorker(data, callback) {
 | |
|     if (!dmpWorker) dmpWorker = createDmpWorker();
 | |
|     var cacheKey = Date.now() + '_' + shortId.generate();
 | |
|     dmpCallbackCache[cacheKey] = callback;
 | |
|     data = Object.assign(data, {
 | |
|         cacheKey: cacheKey
 | |
|     });
 | |
|     dmpWorker.send(data);
 | |
| }
 | |
| 
 | |
| module.exports = function (sequelize, DataTypes) {
 | |
|     var Revision = sequelize.define("Revision", {
 | |
|         id: {
 | |
|             type: DataTypes.UUID,
 | |
|             primaryKey: true,
 | |
|             defaultValue: Sequelize.UUIDV4
 | |
|         },
 | |
|         patch: {
 | |
|             type: DataTypes.TEXT,
 | |
|             get: function () {
 | |
|                 return sequelize.processData(this.getDataValue('patch'), "");
 | |
|             },
 | |
|             set: function (value) {
 | |
|                 this.setDataValue('patch', sequelize.stripNullByte(value));
 | |
|             }
 | |
|         },
 | |
|         lastContent: {
 | |
|             type: DataTypes.TEXT,
 | |
|             get: function () {
 | |
|                 return sequelize.processData(this.getDataValue('lastContent'), "");
 | |
|             },
 | |
|             set: function (value) {
 | |
|                 this.setDataValue('lastContent', sequelize.stripNullByte(value));
 | |
|             }
 | |
|         },
 | |
|         content: {
 | |
|             type: DataTypes.TEXT,
 | |
|             get: function () {
 | |
|                 return sequelize.processData(this.getDataValue('content'), "");
 | |
|             },
 | |
|             set: function (value) {
 | |
|                 this.setDataValue('content', sequelize.stripNullByte(value));
 | |
|             }
 | |
|         },
 | |
|         length: {
 | |
|             type: DataTypes.INTEGER
 | |
|         },
 | |
|         authorship: {
 | |
|             type: DataTypes.TEXT,
 | |
|             get: function () {
 | |
|                 return sequelize.processData(this.getDataValue('authorship'), [], JSON.parse);
 | |
|             },
 | |
|             set: function (value) {
 | |
|                 this.setDataValue('authorship', value ? JSON.stringify(value) : value);
 | |
|             }
 | |
|         }
 | |
|     }, {
 | |
|         classMethods: {
 | |
|             associate: function (models) {
 | |
|                 Revision.belongsTo(models.Note, {
 | |
|                     foreignKey: "noteId",
 | |
|                     as: "note",
 | |
|                     constraints: false
 | |
|                 });
 | |
|             },
 | |
|             getNoteRevisions: function (note, callback) {
 | |
|                 Revision.findAll({
 | |
|                     where: {
 | |
|                         noteId: note.id
 | |
|                     },
 | |
|                     order: '"createdAt" DESC'
 | |
|                 }).then(function (revisions) {
 | |
|                     var data = [];
 | |
|                     for (var i = 0, l = revisions.length; i < l; i++) {
 | |
|                         var revision = revisions[i];
 | |
|                         data.push({
 | |
|                             time: moment(revision.createdAt).valueOf(),
 | |
|                             length: revision.length
 | |
|                         });
 | |
|                     }
 | |
|                     callback(null, data);
 | |
|                 }).catch(function (err) {
 | |
|                     callback(err, null);
 | |
|                 });
 | |
|             },
 | |
|             getPatchedNoteRevisionByTime: function (note, time, callback) {
 | |
|                 // find all revisions to prepare for all possible calculation
 | |
|                 Revision.findAll({
 | |
|                     where: {
 | |
|                         noteId: note.id
 | |
|                     },
 | |
|                     order: '"createdAt" DESC'
 | |
|                 }).then(function (revisions) {
 | |
|                     if (revisions.length <= 0) return callback(null, null);
 | |
|                     // measure target revision position 
 | |
|                     Revision.count({
 | |
|                         where: {
 | |
|                             noteId: note.id,
 | |
|                             createdAt: {
 | |
|                                 $gte: time
 | |
|                             }
 | |
|                         },
 | |
|                         order: '"createdAt" DESC'
 | |
|                     }).then(function (count) {
 | |
|                         if (count <= 0) return callback(null, null);
 | |
|                         sendDmpWorker({
 | |
|                             msg: 'get revision',
 | |
|                             revisions: revisions,
 | |
|                             count: count
 | |
|                         }, callback);
 | |
|                     }).catch(function (err) {
 | |
|                         return callback(err, null);
 | |
|                     });
 | |
|                 }).catch(function (err) {
 | |
|                     return callback(err, null);
 | |
|                 });
 | |
|             },
 | |
|             checkAllNotesRevision: function (callback) {
 | |
|                 Revision.saveAllNotesRevision(function (err, notes) {
 | |
|                     if (err) return callback(err, null);
 | |
|                     if (!notes || notes.length <= 0) {
 | |
|                         return callback(null, notes);
 | |
|                     } else {
 | |
|                         Revision.checkAllNotesRevision(callback);
 | |
|                     }
 | |
|                 });
 | |
|             },
 | |
|             saveAllNotesRevision: function (callback) {
 | |
|                 sequelize.models.Note.findAll({
 | |
|                     // query all notes that need to save for revision
 | |
|                     where: {
 | |
|                         $and: [
 | |
|                             {
 | |
|                                 lastchangeAt: {
 | |
|                                     $or: {
 | |
|                                         $eq: null,
 | |
|                                         $and: {
 | |
|                                             $ne: null,
 | |
|                                             $gt: sequelize.col('createdAt')
 | |
|                                         }
 | |
|                                     }
 | |
|                                 }
 | |
|                             },
 | |
|                             {
 | |
|                                 savedAt: {
 | |
|                                     $or: {
 | |
|                                         $eq: null,
 | |
|                                         $lt: sequelize.col('lastchangeAt')
 | |
|                                     }
 | |
|                                 }
 | |
|                             }
 | |
|                         ]
 | |
|                     }
 | |
|                 }).then(function (notes) {
 | |
|                     if (notes.length <= 0) return callback(null, notes);
 | |
|                     var savedNotes = [];
 | |
|                     async.each(notes, function (note, _callback) {
 | |
|                         // revision saving policy: note not been modified for 5 mins or not save for 10 mins
 | |
|                         if (note.lastchangeAt && note.savedAt) {
 | |
|                             var lastchangeAt = moment(note.lastchangeAt);
 | |
|                             var savedAt = moment(note.savedAt);
 | |
|                             if (moment().isAfter(lastchangeAt.add(5, 'minutes'))) {
 | |
|                                 savedNotes.push(note);
 | |
|                                 Revision.saveNoteRevision(note, _callback);
 | |
|                             } else if (lastchangeAt.isAfter(savedAt.add(10, 'minutes'))) {
 | |
|                                 savedNotes.push(note);
 | |
|                                 Revision.saveNoteRevision(note, _callback);
 | |
|                             } else {
 | |
|                                 return _callback(null, null);
 | |
|                             }
 | |
|                         } else {
 | |
|                             savedNotes.push(note);
 | |
|                             Revision.saveNoteRevision(note, _callback);
 | |
|                         }
 | |
|                     }, function (err) {
 | |
|                         if (err) return callback(err, null);
 | |
|                         // return null when no notes need saving at this moment but have delayed tasks to be done 
 | |
|                         var result = ((savedNotes.length == 0) && (notes.length > savedNotes.length)) ? null : savedNotes;
 | |
|                         return callback(null, result);
 | |
|                     });
 | |
|                 }).catch(function (err) {
 | |
|                     return callback(err, null);
 | |
|                 });
 | |
|             },
 | |
|             saveNoteRevision: function (note, callback) {
 | |
|                 Revision.findAll({
 | |
|                     where: {
 | |
|                         noteId: note.id
 | |
|                     },
 | |
|                     order: '"createdAt" DESC'
 | |
|                 }).then(function (revisions) {
 | |
|                     if (revisions.length <= 0) {
 | |
|                         // if no revision available
 | |
|                         Revision.create({
 | |
|                             noteId: note.id,
 | |
|                             lastContent: note.content,
 | |
|                             length: note.content.length,
 | |
|                             authorship: note.authorship
 | |
|                         }).then(function (revision) {
 | |
|                             Revision.finishSaveNoteRevision(note, revision, callback);
 | |
|                         }).catch(function (err) {
 | |
|                             return callback(err, null);
 | |
|                         });
 | |
|                     } else {
 | |
|                         var latestRevision = revisions[0];
 | |
|                         var lastContent = latestRevision.content || latestRevision.lastContent;
 | |
|                         var content = note.content;
 | |
|                         sendDmpWorker({
 | |
|                             msg: 'create patch',
 | |
|                             lastDoc: lastContent,
 | |
|                             currDoc: content,
 | |
|                         }, function (err, patch) {
 | |
|                             if (err) logger.error('save note revision error', err);
 | |
|                             if (!patch) {
 | |
|                                 // if patch is empty (means no difference) then just update the latest revision updated time 
 | |
|                                 latestRevision.changed('updatedAt', true); 
 | |
|                                 latestRevision.update({
 | |
|                                     updatedAt: Date.now()
 | |
|                                 }).then(function (revision) {
 | |
|                                     Revision.finishSaveNoteRevision(note, revision, callback);
 | |
|                                 }).catch(function (err) {
 | |
|                                     return callback(err, null);
 | |
|                                 });
 | |
|                             } else {
 | |
|                                 Revision.create({
 | |
|                                     noteId: note.id,
 | |
|                                     patch: patch,
 | |
|                                     content: note.content,
 | |
|                                     length: note.content.length,
 | |
|                                     authorship: note.authorship
 | |
|                                 }).then(function (revision) {
 | |
|                                     // clear last revision content to reduce db size
 | |
|                                     latestRevision.update({
 | |
|                                         content: null
 | |
|                                     }).then(function () {
 | |
|                                         Revision.finishSaveNoteRevision(note, revision, callback);
 | |
|                                     }).catch(function (err) {
 | |
|                                         return callback(err, null);
 | |
|                                     });
 | |
|                                 }).catch(function (err) {
 | |
|                                     return callback(err, null);
 | |
|                                 });
 | |
|                             }
 | |
|                         });
 | |
|                     }
 | |
|                 }).catch(function (err) {
 | |
|                     return callback(err, null);
 | |
|                 });
 | |
|             },
 | |
|             finishSaveNoteRevision: function (note, revision, callback) {
 | |
|                 note.update({
 | |
|                     savedAt: revision.updatedAt
 | |
|                 }).then(function () {
 | |
|                     return callback(null, revision);
 | |
|                 }).catch(function (err) {
 | |
|                     return callback(err, null);
 | |
|                 });
 | |
|             }
 | |
|         }
 | |
|     });
 | |
| 
 | |
|     return Revision;
 | |
| }; |