Globals.js contains configuration
use var winston = require('winston'); //a multi-transport async logging library for node.js
config
var logger = new (winston.Logger)({
transports: [
new (winston.transports.Console)({ json: true, timestamp: true }),
new winston.transports.File({ filename: __dirname + '/logs/backup.log', json: false })
],
exceptionHandlers: [
new (winston.transports.Console)({ json: true, timestamp: true }),
new winston.transports.File({ filename: __dirname + '/logs/exceptions.log', json: false })
],
exitOnError: false
});
//Store configuration as objects for example
var config = {};
// MySQL configuration
config.mysql = {};
config.mysql.client_qa = {
user:'user',
password:'mipass',
host:'localhost'
};
//In Node, things are only visible to other things in the same file. By things, I mean variables, functions, classes and class members. Exports allow to access the exported variables in other file with require statement.
exports.header = header;
exports.config = config;
exports.logger = logger;
//exports is a module
Main.js //Is the node executed file contains all the instructions to be executed.
var chokidar = require('chokidar'); //A neat wrapper around node.js fs.watch / fs.watchFile.
var sleep = require('sleep'); //Add sleep()
and usleep()
to nodejs. This is mainly useful for debugging.
var header = require('./Globals').header();
var config = require('./Globals').config;
var xmlHandler = require('./XMLHandler'); //The XMLandler.js
var veo = chokidar.watch(config.xml_deposit(config.msos[0]), {ignored: /^\./, persistent: true});
//chokidar manage actions for add, change, unlink
veo
.on('add', function(path) {
console.log('File', path, 'has been added');
if (path.substr(-3) == "xml" ){
if (path.indexOf(config.msos[0]) >= 0){
var msoS = config.msos[0];
console.log("New File Received ("+new Date()+") : [MSO] "+msoS+" [Path] "+config.xml_deposit(msoS)+" [FileName] "+path.substr(path.lastIndexOf("/")+1) );
sleep.sleep(5);
xmlHandler.handleFile(msoS, path.substr(path.lastIndexOf("/")+1)); //Here we are going to handle the xml File
}
}
})
.on('change', function(path) {
console.log('File', path, 'has been changed');
})
.on('unlink', function(path) {
console.log('File', path, 'has been removed');
})
.on('error', function(error) {
console.error('Error happened', error);
})
XML Handler.js
var mysql = require('mysql');
var fs = require('fs');
var xml2js = require('xml2js');
var config = require('./Globals').config;
var logger = require('./Globals').logger;
var parser = new xml2js.Parser();
var system = require('child_process');
var filename = "";
var mso = "";
var error = false;
var mysql_config;
var client;
var http =require("http");
function handleFile(m, fn){
filename = fn;
mso = m;
fs.readFile(config.xml_deposit(mso)+filename, handleXML);
}
function handleXML(err, data) {
if (err){
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_error(mso)+filename]);
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_backup()+filename]);
system.spawn('rm', [config.xml_deposit(mso)+filename]);
throw new Error('['+mso+'][file_error] - Error en archivo '+filename+' marca tener un error de lectura de archivo eviando archivo a '+config.xml_error(mso)+filename+" e ignorando flujo");
} else {
parser.parseString(data, handleParsedData);
}
}
function handleParsedData(err, objectXML){
if (err){
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_error(mso)+filename]);
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_backup()+filename]);
system.spawn('rm', [config.xml_deposit(mso)+filename]);
throw new Error('['+mso+'][sintaxis] - Error en archivo '+filename+' marca tener un error de sintaxis XML enviando archivo a '+config.xml_error(mso)+filename+" e ignorando flujo");
} else {
var proveedor = objectXML.root.encabezado[0].proveedor[0];
var inicial = objectXML.root.encabezado[0].periodo[0].$.inicial;
var final = objectXML.root.encabezado[0].periodo[0].$.final;
var movimientos = objectXML.root.encabezado[0].numero_movimientos[0];
//Step 0: Validate that the number of "movimientos" reported on the XML is actually the number that exist in it.
if (movimientos != objectXML.root.movimiento.length){
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_error(mso)+filename]);
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_backup()+filename]);
system.spawn('rm', [config.xml_deposit(mso)+filename]);
throw new Error('['+mso+']['+movimientos+']['+objectXML.root.movimiento.length+'] - Error en archivo '+filename+' marca tener '+movimientos+' movimiento(s) cuando en realidad hay '+objectXML.root.movimiento.length+' movimiento(s) enviando archivo a '+config.xml_error(mso)+filename+" e ignorando flujo");
} else {
//Step 1: Insert the XML header in MySQL DB
mysql_config = config.mysql;
client = mysql.createPool(mysql_config.client_qa);
insertaXML(proveedor, inicial, final, movimientos, objectXML);
}
}
}
function insertaXML (proveedor, inicial, final, movimientos, objectXML) {
var queryStr = "INSERT INTO `msodb`.`xml` (`id` ,`proveedor` ,`inicial` ,`final` ,`movimientos`) VALUES (NULL , '"+proveedor+"', '"+inicial+"', '"+final+"', "+movimientos+");";
client.getConnection(function (err0, connection) {
if (err0){
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_backup()+filename]);
system.spawn('rm', [config.xml_deposit(mso)+filename]);
throw new Error('['+mso+']['+filename+'] - Error en MySQL no hay conexión, se requiere ingestar nuevamente el archivo.');
} else {
connection.query('USE ' + mysql_config.database);
connection.query(queryStr, function (err, results, fields) {
if (err) {
logger.info(queryStr);
console.log(err);
} else {
if (results.affectedRows == 1){ //Step 2: Insert the movements in the MySQL DB
getEmail(results.insertId, proveedor, objectXML, connection);
// insertaMovimientos(results.insertId, proveedor, objectXML, connection);
} else {
logger.info(queryStr);
}
}
});
}
});
}
function getEmail(xml,mso,objeto,connection){
var contracts= [];
var msoId = {
"VEO" : 1,
"CABLEVISION" : 2,
"CABLEMAS" : 3,
"SKY" : 4,
};
for (var x = 0; x < objeto.root.movimiento.length; x++ ){
contracts[x]= objeto.root.movimiento[x].id[0];
}
jsonObject = JSON.stringify({"contracts":contracts.join(), "mso":msoId[mso]});
var emails="";
var postheaders = {
'Content-Type' : 'application/json',
'Content-Length' : Buffer.byteLength(jsonObject, 'utf8')
};
var options = {
host: config.ws.host,
port: config.ws.port,
path: config.ws.path,
method: config.ws.method,
headers: postheaders
};
// do the POST call
var reqPost = http.request(options, function(res) {
res.on('data', function(d) {
emails+=d;
insertaMovimientos(xml,mso,objeto,connection,emails);
});
});
// write the json data
reqPost.write(jsonObject);
reqPost.end();
reqPost.on('error', function(e) {
console.error(e);
});
}
function insertaMovimientos(xml, mso, objeto, connection,emails){
var emailsArray=JSON.parse(emails);
error = false;
for (var x = 0; x < objeto.root.movimiento.length; x++ ){
var idmso = objeto.root.movimiento[x].id;
var nombre = objeto.root.movimiento[x].usuario[0].nombre;
if (objeto.root.movimiento[x].usuario[0].direccion != null){
var calle = objeto.root.movimiento[x].usuario[0].direccion[0].calle;
var ciudad = objeto.root.movimiento[x].usuario[0].direccion[0].ciudad;
var colonia = objeto.root.movimiento[x].usuario[0].direccion[0].colonia;
var municipio = objeto.root.movimiento[x].usuario[0].direccion[0].municipio;
var delegacion = objeto.root.movimiento[x].usuario[0].direccion[0].delegacion;
var numero_exterior = objeto.root.movimiento[x].usuario[0].direccion[0].numero_exterior;
var numero_interior = objeto.root.movimiento[x].usuario[0].direccion[0].numero_interior;
var estado = objeto.root.movimiento[x].usuario[0].direccion[0].estado
var pais = objeto.root.movimiento[x].usuario[0].direccion[0].pais
var cp = objeto.root.movimiento[x].usuario[0].direccion[0].cp;
} else {
var calle, ciudad, colonia, municipio, delegacion, numero_exterior, numero_interior, estado, pais, cp;
calle = ciudad = colonia = municipio = delegacion = numero_exterior = numero_interior = estado = pais = cp = 'n/a';
cp = '-1';
}
//var email = objeto.root.movimiento[x].usuario[0].email;
var fase = objeto.root.movimiento[x].fase+"";
var tipo = objeto.root.movimiento[x].tipo;
if (fase.toUpperCase() == "COBRADO"){
var monto_facturado = objeto.root.movimiento[x].monto_facturado;
} else if (fase.toUpperCase() == "PAGADO"){
var monto_facturado = objeto.root.movimiento[x].monto_pagado;
} else if (fase.toUpperCase() == "EMITIDO"){
var monto_facturado = objeto.root.movimiento[x].monto_facturado;
} else {
var monto_facturado = objeto.root.movimiento[x].monto_facturado;
}
var fecha = objeto.root.movimiento[x].fecha;
var sc = "0";
var email=findEmail(emailsArray,idmso);
var queryStr = "INSERT INTO `msodb`.`movimiento` (`id_xml`, `sc`, `mso`, `idmso`, `nombre`, `calle`, `ciudad`, `colonia`, `municipio`, `delegacion`, `numero_exterior`, `numero_interior`, `estado`, `pais`, `cp`, `email`, `fase`, `tipo`, `monto`, `concepto`, `fecha`) VALUES ("+xml+", '"+sc+"', '"+mso.toUpperCase()+"', '"+idmso+"', '"+nombre+"', '"+calle+"', '"+ciudad+"', '"+colonia+"', '"+municipio+"', '"+delegacion+"', '"+numero_exterior+"', '"+numero_interior+"', '"+estado+"', '"+pais+"', '"+cp+"', '"+email+"', '"+fase.toUpperCase()+"', '"+tipo+"', "+monto_facturado+", 'Suscripción Mensual SVOD', '"+fecha+"');";
queryStr = queryStr.split("[object Object]").join("n/a");
connection.query(queryStr, function (err, results, fields) {
if (err) {
if (err.code == "ER_DUP_ENTRY"){
var queryUpdate = queryStr.substr(queryStr.indexOf("VALUES (")+8);
queryUpdate = queryUpdate.substr(0,queryUpdate.length-2);
var items = queryUpdate.split(',');
console.log(err);
var queryUpdate = "id_xml="+items[0]+",sc="+items[1]+",mso="+items[2]+",idmso="+items[3]+",nombre="+items[4]+",calle="+items[5]+",ciudad="+items[6]+",colonia="+items[7]+",municipio="+items[8]+",delegacion="+items[9]+",numero_exterior="+items[10]+",numero_interior="+items[11]+",estado="+items[12]+",pais="+items[13]+",cp="+items[14]+",email="+items[15]+",fase="+items[16]+",tipo="+items[17]+",monto="+items[18]+",concepto="+items[19]+",fecha="+items[20];
queryUpdate = "UPDATE movimiento SET "+queryUpdate+" WHERE mso = '"+mso.toUpperCase()+"' AND idmso = "+items[3]+";";
actualizaMovimientos(queryUpdate, connection, objeto.root.movimiento.length, x, error);
} else {
logger.info(queryStr);
console.log(err);
error = true;
finalStep( objeto.root.movimiento.length, x, error, connection);
}
} else {
if (results.affectedRows == 1){ error = false;
finalStep( objeto.root.movimiento.length, x, error, connection);
} else {
error = true;
logger.info(queryStr);
finalStep( objeto.root.movimiento.length, x, error, connection);
}
}
});
}
}
function actualizaMovimientos(queryStr, connection, original, count, error){
connection.query(queryStr, function (err, results, fields) {
if (err) {
logger.info(queryStr);
console.log(err);
error = true;
finalStep( original, count, error, connection);
} else {
if (results.affectedRows == 1){
error = false;
finalStep( original, count, error, connection);
} else {
error = true;
logger.info(queryStr);
finalStep( original, count, error, connection);
}
}
});
}
function finalStep(original, count, error, connection){
if (original == count){
if (error){
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_error(mso)+filename]);
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_backup()+filename]);
system.spawn('rm', [config.xml_deposit(mso)+filename]);
} else {
system.spawn('cp', ['-r', config.xml_deposit(mso)+filename, config.xml_backup()+"/ok/"+filename]);
system.spawn('rm', [config.xml_deposit(mso)+filename]);
}
connection.end();
}
}
function findEmail(emails,contract){
for (var x=0; x< emails.length; x++){
if (emails[x].contract==contract)
return emails[x].email;
}
return null;
}
exports.handleFile = handleFile;