1 var Net = require('net'),
  2   EventEmitter = require('events').EventEmitter,
  3   Util = require('util'),
  4   ErrorMessages = require('./constants/ErrorMessages'),
  5   DATA_TYPES = require('./constants/DataTypes'),
  6   CASConstants = require('./constants/CASConstants'),
  7   PacketReader = require('./packets/PacketReader'),
  8   PacketWriter = require('./packets/PacketWriter'),
  9   ActionQueue = require('./utils/ActionQueue'),
 10   Helpers = require('../src/utils/Helpers'),
 11   Cache = require('../src/utils/Cache');
 12 
 13 //8.4.1 protocol packets definition
 14 var _8_4_1 = '8.4.1';
 15 var ClientInfoExchangePacket = require('./packets/' + _8_4_1 + '/ClientInfoExchangePacket'),
 16   OpenDatabasePacket = require('./packets/' + _8_4_1 + '/OpenDatabasePacket'),
 17   GetEngineVersionPacket = require('./packets/' + _8_4_1 + '/GetEngineVersionPacket'),
 18   ExecuteQueryPacket = require('./packets/' + _8_4_1 + '/ExecuteQueryPacket'),
 19   GetSchemaPacket = require('./packets/' + _8_4_1 + '/GetSchemaPacket.js'),
 20   CloseQueryPacket = require('./packets/' + _8_4_1 + '/CloseQueryPacket'),
 21   BatchExecuteNoQueryPacket = require('./packets/' + _8_4_1 + '/BatchExecuteNoQueryPacket'),
 22   CloseDatabasePacket = require('./packets/' + _8_4_1 + '/CloseDatabasePacket'),
 23   FetchPacket = require('./packets/' + _8_4_1 + '/FetchPacket'),
 24   SetAutoCommitModePacket = require('./packets/' + _8_4_1 + '/SetAutoCommitModePacket'),
 25   RollbackPacket = require('./packets/' + _8_4_1 + '/RollbackPacket'),
 26   CommitPacket = require('./packets/' + _8_4_1 + '/CommitPacket'),
 27   LOBReadPacket = require('./packets/' + _8_4_1 + '/LOBReadPacket');
 28 
 29 module.exports = CUBRIDConnection;
 30 
 31 //Support custom events
 32 Util.inherits(CUBRIDConnection, EventEmitter);
 33 
 34 /**
 35  * Create a new CUBRID connection instance
 36  * @param brokerServer
 37  * @param brokerPort
 38  * @param user
 39  * @param password
 40  * @param database
 41  * @param cacheTimeout
 42  * @constructor
 43  */
 44 function CUBRIDConnection(brokerServer, brokerPort, user, password, database, cacheTimeout) {
 45   // Using EventEmitter.call on an object will do the setup of instance methods / properties
 46   // (not inherited) of an EventEmitter.
 47   // It is similar in purpose to super(...) in Java or base(...) in C#, but it is not implicit in Javascript.
 48   // Because of this, we must manually call it ourselves:
 49   EventEmitter.call(this);
 50 
 51   this._queryCache = null;
 52   if (typeof cacheTimeout !== 'undefined' && cacheTimeout > 0) {
 53     this._queryCache = new Cache();
 54   }
 55 
 56   this._socket = new Net.Socket();
 57 
 58   // Connection parameters
 59   this.brokerServer = brokerServer || 'localhost';
 60   this.initialBrokerPort = brokerPort || 33000;
 61   this.connectionBrokerPort = -1;
 62   this.user = user || 'public';
 63   this.password = password || '';
 64   this.database = database || 'demodb';
 65 
 66   // Session public variables
 67   this.autoCommitMode = null; //will be initialized on connect
 68   this.sessionId = 0;
 69 
 70   // Execution semaphore variables; prevent double-connect-attempts, overlapping-queries etc.
 71   this.connectionOpened = false;
 72   this.connectionPending = false;
 73   this.queryPending = false;
 74 
 75   // Driver events
 76   this.EVENT_ERROR = 'error';
 77   this.EVENT_CONNECTED = 'connect';
 78   this.EVENT_ENGINE_VERSION_AVAILABLE = 'engine version';
 79   this.EVENT_BATCH_COMMANDS_COMPLETED = 'batch execute done';
 80   this.EVENT_QUERY_DATA_AVAILABLE = 'query data';
 81   this.EVENT_SCHEMA_DATA_AVAILABLE = 'schema data';
 82   this.EVENT_FETCH_DATA_AVAILABLE = 'fetch';
 83   this.EVENT_FETCH_NO_MORE_DATA_AVAILABLE = 'fetch done';
 84   this.EVENT_BEGIN_TRANSACTION = 'begin transaction';
 85   this.EVENT_SET_AUTOCOMMIT_MODE_COMPLETED = 'set autocommit mode';
 86   this.EVENT_COMMIT_COMPLETED = 'commit';
 87   this.EVENT_ROLLBACK_COMPLETED = 'rollback';
 88   this.EVENT_QUERY_CLOSED = 'close query';
 89   this.EVENT_CONNECTION_CLOSED = 'close';
 90   this.EVENT_LOB_READ_COMPLETED = 'lob read completed';
 91 
 92   //Auto-commit constants
 93   this.AUTOCOMMIT_ON = true;
 94   this.AUTOCOMMIT_OFF = false;
 95 
 96   //Database schema variables
 97   this.SCHEMA_TABLE = CASConstants.CUBRIDSchemaType.CCI_SCH_CLASS;
 98   this.SCHEMA_VIEW = CASConstants.CUBRIDSchemaType.CCI_SCH_VCLASS;
 99   this.SCHEMA_ATTRIBUTE = CASConstants.CUBRIDSchemaType.CCI_SCH_ATTRIBUTE;
100 
101   //Private variables
102   this._CASInfo = [0, 0xFF, 0xFF, 0xFF];
103   this._queriesPacketList = [];
104   this._INVALID_RESPONSE_LENGTH = -1;
105   this._PREVENT_CONCURRENT_REQUESTS = true;
106   this._LOB_MAX_IO_LENGTH = 128 * 1024;
107 
108   //Database engine version
109   this._DB_ENGINE_VER = '';
110 
111   //Uncomment the following lines if you will not always provide an 'error' listener in your consumer code,
112   //to avoid any unexpected exception. Be aware that:
113   //Error events are treated as a special case in node. If there is no listener for it,
114   //then the default action is to print a stack trace and exit the program.
115   //http://nodejs.org/api/events.html
116   //this.on('error',function(err){
117   //  Helpers.logError(err.message);
118   //  //... (add your own error-handling code)
119   //});
120 }
121 
122 /**
123  * Get broker connection port
124  * @param self
125  * @param callback
126  * @private
127  */
128 CUBRIDConnection.prototype._doGetBrokerPort = function (self, callback) {
129   self._socket = Net.createConnection(self.initialBrokerPort, self.brokerServer);
130   self._socket.setNoDelay(true);
131 
132   var packetWriter = new PacketWriter();
133   var clientInfoExchangePacket = new ClientInfoExchangePacket();
134   clientInfoExchangePacket.write(packetWriter);
135   self._socket.write(packetWriter._buffer);
136 
137   self._socket.on('error', function (err) {
138     this.connectionOpened = false;
139     delete this._queriesPacketList;
140     callback.call(err);
141   });
142 
143   self._socket.once('data', function (data) {
144     var packetReader = new PacketReader();
145     packetReader.write(data);
146     clientInfoExchangePacket.parse(packetReader);
147     var newPort = clientInfoExchangePacket.newConnectionPort;
148     self.connectionBrokerPort = newPort;
149     if (newPort > 0) {
150       self._socket.end();
151     }
152     if (callback && typeof(callback) === 'function') {
153       if (newPort < 0) {
154         callback.call(null);
155       } else {
156         var err = new Error(ErrorMessages.ERROR_NEW_BROKER_PORT);
157         callback.call(err);
158       }
159     }
160   });
161 };
162 
163 /**
164  * Login to a database
165  * @param self
166  * @param callback
167  * @private
168  */
169 CUBRIDConnection.prototype._doDatabaseLogin = function (self, callback) {
170   var err = null;
171   var responseData = new Buffer(0);
172   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
173 
174   if (self.connectionBrokerPort > 0) {
175     self._socket = Net.createConnection(self.connectionBrokerPort, self.brokerServer);
176     self._socket.setNoDelay(true);
177   }
178 
179   var packetWriter = new PacketWriter();
180   var openDatabasePacket = new OpenDatabasePacket(
181     {
182       database : self.database,
183       user     : self.user,
184       password : self.password,
185       casInfo  : self._CASInfo
186     }
187   );
188   openDatabasePacket.write(packetWriter);
189   self._socket.write(packetWriter._buffer);
190 
191   self._socket.on('error', function (err) {
192     this.connectionOpened = false;
193     delete this._queriesPacketList;
194     callback.call(self, err);
195   });
196 
197   self._socket.on('data', function (data) {
198     responseData = Helpers._combineData(responseData, data);
199     if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
200       && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
201       expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
202     }
203     if (responseData.length === expectedResponseLength) {
204       self._socket.removeAllListeners('data');
205       var packetReader = new PacketReader();
206       packetReader.write(responseData);
207       openDatabasePacket.parse(packetReader);
208       self._CASInfo = openDatabasePacket.casInfo;
209       var errorCode = openDatabasePacket.errorCode;
210       var errorMsg = openDatabasePacket.errorMsg;
211       if (errorCode !== 0) {
212         err = new Error(errorCode + ':' + errorMsg);
213       } else {
214         self.sessionId = openDatabasePacket.sessionId;
215         self.autoCommitMode = (self._CASInfo[3] & 0x01) ? self.AUTOCOMMIT_ON : self.AUTOCOMMIT_OFF;
216       }
217       callback.call(self, err);
218     }
219   });
220 };
221 
222 /**
223  * Get the server database engine version
224  * @param callback
225  */
226 CUBRIDConnection.prototype._getEngineVersion = function (self, callback) {
227   var err = null;
228   var responseData = new Buffer(0);
229   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
230 
231   var packetWriter = new PacketWriter();
232   var getEngineVersionPacket = new GetEngineVersionPacket(
233     {
234       casInfo : self._CASInfo
235     }
236   );
237 
238   getEngineVersionPacket.write(packetWriter);
239   self._socket.write(packetWriter._buffer);
240 
241   self._socket.on('data', function (data) {
242     responseData = Helpers._combineData(responseData, data);
243     if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
244       && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
245       expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
246     }
247     if (responseData.length === expectedResponseLength) {
248       self._socket.removeAllListeners('data');
249       var packetReader = new PacketReader();
250       packetReader.write(data);
251       getEngineVersionPacket.parse(packetReader);
252       var errorCode = getEngineVersionPacket.errorCode;
253       var errorMsg = getEngineVersionPacket.errorMsg;
254       if (errorCode !== 0) {
255         err = new Error(errorCode + ':' + errorMsg);
256       } else {
257         self._DB_ENGINE_VER = getEngineVersionPacket.engineVersion;
258       }
259       if (callback && typeof(callback) === 'function') {
260         callback.call(self, err);
261       }
262     }
263   });
264 };
265 
266 /**
267  * Connect to database
268  * @param callback
269  */
270 CUBRIDConnection.prototype.connect = function (callback) {
271   var self = this;
272 
273   if (self.connectionOpened == true) {
274     var err = new Error(ErrorMessages.ERROR_CONNECTION_ALREADY_OPENED);
275     Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
276     if (callback && typeof(callback) === 'function') {
277       callback.call(self, err);
278     }
279     return;
280   }
281 
282   if (self.connectionPending == true) {
283     err = new Error(ErrorMessages.ERROR_CONNECTION_ALREADY_PENDING);
284     Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
285     if (callback && typeof(callback) === 'function') {
286       callback.call(self, err);
287     }
288     return;
289   }
290 
291   self.connectionPending = true;
292 
293   ActionQueue.enqueue(
294     [
295       function (cb) {
296         self._doGetBrokerPort(self, cb);
297       },
298 
299       function (cb) {
300         self._doDatabaseLogin(self, cb);
301       },
302 
303       function (cb) {
304         self._getEngineVersion(self, cb);
305       }
306     ],
307 
308     function (err) {
309       self.queryPending = false; //reset query execution status
310       self.connectionPending = false;
311       self.connectionOpened = !(typeof err != 'undefined' && err != null);
312       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_CONNECTED);
313       if (callback && typeof(callback) === 'function') {
314         callback.call(self, err);
315       }
316     }
317   );
318 };
319 
320 /**
321  * Get the server database engine version
322  * @param callback
323  */
324 CUBRIDConnection.prototype.getEngineVersion = function (callback) {
325   Helpers._emitEvent(this, null, this.EVENT_ERROR, this.EVENT_ENGINE_VERSION_AVAILABLE, this._DB_ENGINE_VER);
326   if (callback && typeof(callback) === 'function') {
327     callback.call(this, null, this._DB_ENGINE_VER);
328   }
329 };
330 
331 /**
332  * Execute SQL statements in batch mode
333  * @param sqls
334  * @param callback
335  */
336 CUBRIDConnection.prototype.batchExecuteNoQuery = function (sqls, callback) {
337   var self = this;
338   var sqlsArr = null;
339   var err = null;
340 
341   if (Array.isArray(sqls)) {
342     if (sqls.length == 0) {
343       //no commands to execute
344       Helpers._emitEvent(self, null, null, self.EVENT_BATCH_COMMANDS_COMPLETED);
345       if (callback && typeof(callback) === 'function') {
346         callback.call(self, null);
347       }
348       return;
349     }
350     sqlsArr = sqls;
351   } else {
352     sqlsArr = new Array(sqls);
353   }
354 
355   for (var i = 0; i < sqlsArr.length; i++) {
356     if (!Helpers._validateInputSQLString(sqlsArr[i])) {
357       err = new Error(ErrorMessages.ERROR_INPUT_VALIDATION);
358       Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
359       if (callback && typeof(callback) === 'function') {
360         callback.call(self, err);
361       }
362       return;
363     }
364   }
365 
366   var responseData = new Buffer(0);
367   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
368 
369   ActionQueue.enqueue(
370     [
371       function (cb) {
372         if (self.connectionOpened === false) {
373           self.connect(cb);
374         } else {
375           cb();
376         }
377       },
378 
379       function (cb) {
380         var packetWriter = new PacketWriter();
381         var batchExecuteNoQueryPacket = new BatchExecuteNoQueryPacket(
382           {
383             SQLs           : sqlsArr,
384             casInfo        : self._CASInfo,
385             autoCommitMode : self.autoCommitMode
386           }
387         );
388         batchExecuteNoQueryPacket.write(packetWriter);
389         self._socket.write(packetWriter._buffer);
390 
391         self._socket.on('data', function (data) {
392           responseData = Helpers._combineData(responseData, data);
393           if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
394             && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
395             expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
396           }
397           if (responseData.length === expectedResponseLength) {
398             self._socket.removeAllListeners('data');
399             var packetReader = new PacketReader();
400             packetReader.write(data);
401             batchExecuteNoQueryPacket.parse(packetReader);
402             var errorCode = batchExecuteNoQueryPacket.errorCode;
403             var errorMsg = batchExecuteNoQueryPacket.errorMsg;
404             if (errorCode !== 0) {
405               err = new Error(errorCode + ':' + errorMsg);
406             }
407             if (cb && typeof(cb) === 'function') {
408               cb.call(self, err);
409             }
410           }
411         });
412       }
413     ],
414 
415     function (err) {
416       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_BATCH_COMMANDS_COMPLETED);
417       if (callback && typeof(callback) === 'function') {
418         callback.call(self, err);
419       }
420     }
421   );
422 };
423 
424 /**
425  * Execute a SQL statement which does not return records
426  * @param sql
427  * @param callback
428  */
429 CUBRIDConnection.prototype.execute = function (sql, callback) {
430   var self = this;
431   var err = null;
432 
433   if (!Helpers._validateInputSQLString(sql)) {
434     err = new Error(ErrorMessages.ERROR_INPUT_VALIDATION);
435     Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
436     if (callback && typeof(callback) === 'function') {
437       callback.call(self, err);
438     }
439     return null;
440   }
441 
442   var arrSQL = [];
443   arrSQL.push(sql);
444 
445   return this.batchExecuteNoQuery(arrSQL, callback);
446 };
447 
448 /**
449  * Execute sql statement with parameters
450  * @param sql
451  * @param arrParamsValues
452  * @param arrDelimiters
453  * @param callback
454  * @return {*}
455  */
456 CUBRIDConnection.prototype.executeWithParams = function (sql, arrParamsValues, arrDelimiters, callback) {
457   var formattedSQL = Helpers._sqlFormat(sql, arrParamsValues, arrDelimiters);
458 
459   return this.execute(formattedSQL, callback);
460 };
461 
462 /**
463  * Execute query and retrieve rows results
464  * @param sql
465  * @param callback
466  */
467 CUBRIDConnection.prototype.query = function (sql, callback) {
468   var err = null;
469   var self = this;
470 
471   if (self.queryPending == true && self._PREVENT_CONCURRENT_REQUESTS) {
472     err = new Error(ErrorMessages.ERROR_QUERY_ALREADY_PENDING);
473     Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
474     if (callback && typeof(callback) === 'function') {
475       callback.call(self, err);
476     }
477     return;
478   }
479 
480   if (!Helpers._validateInputSQLString(sql)) {
481     Helpers._emitEvent(self, new Error(ErrorMessages.ERROR_INPUT_VALIDATION), self.EVENT_ERROR, null);
482     if (callback && typeof(callback) === 'function') {
483       callback.call(self, err);
484     }
485     return;
486   }
487 
488   var responseData = new Buffer(0);
489   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
490 
491   self.queryPending = true;
492 
493   ActionQueue.enqueue(
494     [
495       function (cb) {
496         if (self.connectionOpened === false) {
497           self.connect(cb);
498         } else {
499           cb();
500         }
501       },
502 
503       function (cb) {
504         // Check if data is already in cache
505         if (self._queryCache != null) {
506           if (self._queryCache.contains(sql)) {
507             self.queryPending = false;
508             //query handle set to null, to prevent further fetch (cache is intended only for small data)
509             Helpers._emitEvent(self, null, null, self.EVENT_QUERY_DATA_AVAILABLE, self._queryCache.get(sql), null);
510             if (callback && typeof(callback) === 'function') {
511               callback(err, self._queryCache.get(sql), null);
512             }
513             return;
514           }
515         }
516 
517         var packetWriter = new PacketWriter();
518         var executeQueryPacket = new ExecuteQueryPacket(
519           {
520             sql            : sql,
521             casInfo        : self._CASInfo,
522             autoCommitMode : self.autoCommitMode
523           }
524         );
525         executeQueryPacket.write(packetWriter);
526         self._socket.write(packetWriter._buffer);
527 
528         self._socket.on('data', function (data) {
529           responseData = Helpers._combineData(responseData, data);
530           if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
531             && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
532             expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
533           }
534           if (responseData.length === expectedResponseLength) {
535             self._socket.removeAllListeners('data');
536             var packetReader = new PacketReader();
537             packetReader.write(responseData);
538             var result = executeQueryPacket.parse(packetReader);
539             var errorCode = executeQueryPacket.errorCode;
540             var errorMsg = executeQueryPacket.errorMsg;
541             if (errorCode !== 0) {
542               err = new Error(errorCode + ':' + errorMsg);
543             } else {
544               self._queriesPacketList.push(executeQueryPacket);
545             }
546             if (cb && typeof(cb) === 'function') {
547               if (typeof err != 'undefined' && err != null) {
548 //                self.queryPending = false;
549               } else {
550                 if (self._queryCache !== null) {
551                   self._queryCache.getSet(sql, result);
552                 }
553               }
554               cb.call(self, err, result, executeQueryPacket.handle);
555             }
556           }
557         });
558       }
559     ],
560 
561     function (err, result, handle) {
562       self.queryPending = false;
563 //      if (typeof err != 'undefined' && err != null) {
564 //        self.queryPending = false;
565 //      }
566       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_QUERY_DATA_AVAILABLE, result, handle);
567       if (callback && typeof(callback) === 'function') {
568         callback.call(self, err, result, handle);
569       }
570     }
571   );
572 };
573 
574 /**
575  * Execute query with parameters
576  * @param sql
577  * @param arrParamsValues
578  * @param arrDelimiters
579  * @param callback
580  * @return {*}
581  */
582 CUBRIDConnection.prototype.queryWithParams = function (sql, arrParamsValues, arrDelimiters, callback) {
583   var formattedSQL = Helpers._sqlFormat(sql, arrParamsValues, arrDelimiters);
584 
585   return this.query(formattedSQL, callback);
586 };
587 
588 /**
589  * Fetch query next rows results
590  * @param queryHandle
591  * @param callback
592  */
593 CUBRIDConnection.prototype.fetch = function (queryHandle, callback) {
594   var err = null;
595   var self = this;
596   var responseData = new Buffer(0);
597   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
598 
599   self._socket.on('data', function (data) {
600     responseData = Helpers._combineData(responseData, data);
601     if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
602       && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
603       expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
604     }
605     if (responseData.length === expectedResponseLength) {
606       self._socket.removeAllListeners('data');
607       var packetReader = new PacketReader();
608       packetReader.write(responseData);
609       var result = fetchPacket.parse(packetReader, self._queriesPacketList[i]);
610       var errorCode = fetchPacket.errorCode;
611       var errorMsg = fetchPacket.errorMsg;
612       if (errorCode !== 0) {
613         err = new Error(errorCode + ':' + errorMsg);
614       }
615       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_FETCH_DATA_AVAILABLE, result, queryHandle);
616       if (callback && typeof(callback) === 'function') {
617         callback.call(self, err, result, queryHandle);
618       }
619     }
620   });
621 
622   var foundQueryHandle = false;
623   for (var i = 0; i < this._queriesPacketList.length; i++) {
624     if (this._queriesPacketList[i].handle === queryHandle) {
625       foundQueryHandle = true;
626       break;
627     }
628   }
629 
630   if (!foundQueryHandle) {
631     err = new Error(ErrorMessages.ERROR_NO_ACTIVE_QUERY);
632     self._socket.removeAllListeners('data');
633     Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
634     if (callback && typeof(callback) === 'function') {
635       callback.call(self, err, null, null);
636     }
637   } else {
638     if (this._queriesPacketList[i].currentTupleCount === this._queriesPacketList[i].totalTupleCount) {
639       self._socket.removeAllListeners('data');
640       Helpers._emitEvent(self, null, null, self.EVENT_FETCH_NO_MORE_DATA_AVAILABLE, queryHandle);
641       if (callback && typeof(callback) === 'function') {
642         callback.call(self, err, null, queryHandle);
643       }
644     } else {
645       var packetWriter = new PacketWriter();
646       var fetchPacket = new FetchPacket(
647         {
648           casInfo : self._CASInfo
649         }
650       );
651       fetchPacket.write(packetWriter, this._queriesPacketList[i]);
652       self._socket.write(packetWriter._buffer);
653     }
654   }
655 };
656 
657 /**
658  * Close query
659  * @param queryHandle
660  * @param callback
661  */
662 CUBRIDConnection.prototype.closeQuery = function (queryHandle, callback) {
663   var err = null;
664   var self = this;
665   var responseData = new Buffer(0);
666   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
667 
668   if (!Helpers._validateInputPositive(queryHandle)) {
669     Helpers._emitEvent(self, new Error(ErrorMessages.ERROR_INPUT_VALIDATION), self.EVENT_ERROR, null);
670     if (callback && typeof(callback) === 'function') {
671       callback.call(self, err);
672     }
673     return;
674   }
675 
676   self.queryPending = false;
677 
678   var foundQueryHandle = false;
679   for (var i = 0; i < this._queriesPacketList.length; i++) {
680     if (this._queriesPacketList[i].handle === queryHandle) {
681       foundQueryHandle = true;
682       break;
683     }
684   }
685   if (!foundQueryHandle) {
686     err = new Error(ErrorMessages.ERROR_NO_ACTIVE_QUERY + ": " + queryHandle);
687     self._socket.removeAllListeners('data');
688     Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
689     if (callback && typeof(callback) === 'function') {
690       callback.call(self, err, null);
691     }
692   } else {
693     var packetWriter = new PacketWriter();
694     var closeQueryPacket = new CloseQueryPacket(
695       {
696         casInfo   : self._CASInfo,
697         reqHandle : queryHandle
698       }
699     );
700     closeQueryPacket.write(packetWriter);
701     self._socket.write(packetWriter._buffer);
702   }
703 
704   self._socket.on('data', function (data) {
705     responseData = Helpers._combineData(responseData, data);
706     if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
707       && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
708       expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
709     }
710     if (responseData.length === expectedResponseLength) {
711       self._socket.removeAllListeners('data');
712       var packetReader = new PacketReader();
713       packetReader.write(data);
714       closeQueryPacket.parse(packetReader);
715       var errorCode = closeQueryPacket.errorCode;
716       var errorMsg = closeQueryPacket.errorMsg;
717       if (errorCode !== 0) {
718         err = new Error(errorCode + ':' + errorMsg);
719       } else {
720         for (var i = 0; i < self._queriesPacketList.length; i++) {
721           if (self._queriesPacketList[i].handle === queryHandle) {
722             self._queriesPacketList.splice(i, 1);
723             break;
724           }
725         }
726       }
727       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_QUERY_CLOSED, queryHandle);
728       if (callback && typeof(callback) === 'function') {
729         callback.call(self, err, queryHandle);
730       }
731     }
732   });
733 };
734 
735 /**
736  * Close connection
737  * @param callback
738  */
739 CUBRIDConnection.prototype.close = function (callback) {
740   var err = null;
741   var self = this;
742   var responseData = new Buffer(0);
743   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
744 
745   if (self.connectionOpened == false) {
746     err = new Error(ErrorMessages.ERROR_CONNECTION_ALREADY_CLOSED);
747     Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
748     if (callback && typeof(callback) === 'function') {
749       callback.call(self, err);
750     }
751     return;
752   }
753 
754   //reset connection status
755   self.queryPending = false;
756   self.connectionPending = false;
757   self.connectionOpened = false;
758 
759   ActionQueue.enqueue(
760     [
761       function (cb) {
762         ActionQueue.while(
763           function () {
764             return (self._queriesPacketList[0] !== null && self._queriesPacketList[0] !== undefined);
765           },
766 
767           function (callb) {
768             self.closeQuery(self._queriesPacketList[0].handle, callb);
769           },
770 
771           function (err) {
772             //log non-blocking error
773             if (typeof err != 'undefined' && err != null) {
774               Helpers.logError(ErrorMessages.ERROR_ON_CLOSE_QUERY_HANDLE + err);
775             }
776             cb.call(null);
777           }
778         );
779       },
780 
781       function (cb) {
782         var packetWriter = new PacketWriter();
783         var closeDatabasePacket = new CloseDatabasePacket(
784           {
785             casInfo : self._CASInfo
786           }
787         );
788         closeDatabasePacket.write(packetWriter);
789         self._socket.write(packetWriter._buffer);
790 
791         self._socket.on('data', function (data) {
792           responseData = Helpers._combineData(responseData, data);
793           if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
794             && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
795             expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
796           }
797           if (responseData.length === expectedResponseLength) {
798             self._socket.removeAllListeners('data');
799             var packetReader = new PacketReader();
800             packetReader.write(data);
801             closeDatabasePacket.parse(packetReader);
802             // Close internal socket connection
803             self._socket.destroy();
804             var errorCode = closeDatabasePacket.errorCode;
805             var errorMsg = closeDatabasePacket.errorMsg;
806             if (errorCode !== 0) {
807               err = new Error(errorCode + ':' + errorMsg);
808             }
809             if (cb && typeof(cb) === 'function') {
810               cb.call(self, err);
811             }
812           }
813         });
814       }
815     ],
816 
817     function (err) {
818       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_CONNECTION_CLOSED);
819       if (callback && typeof(callback) === 'function') {
820         callback.call(self, err);
821       }
822     }
823   );
824 };
825 
826 /**
827  * Start transaction
828  * @param callback
829  */
830 CUBRIDConnection.prototype.beginTransaction = function (callback) {
831   var self = this;
832   _toggleAutoCommitMode(self, self.AUTOCOMMIT_OFF, function (err) {
833     Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_BEGIN_TRANSACTION);
834     if (callback && typeof(callback) === 'function') {
835       callback.call(self, err);
836     }
837   });
838 };
839 
840 /**
841  * Set session auto-commit mode
842  * @param autoCommitMode
843  * @param callback
844  */
845 CUBRIDConnection.prototype.setAutoCommitMode = function (autoCommitMode, callback) {
846   var self = this;
847   _toggleAutoCommitMode(self, autoCommitMode, function (err) {
848     Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_SET_AUTOCOMMIT_MODE_COMPLETED);
849     if (callback && typeof(callback) === 'function') {
850       callback.call(self, err);
851     }
852   });
853 };
854 
855 /**
856  * RollbackPacket transaction
857  * @param callback
858  */
859 CUBRIDConnection.prototype.rollback = function (callback) {
860   var err = null;
861   var self = this;
862   var responseData = new Buffer(0);
863   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
864 
865   if (self.autoCommitMode === false) {
866     var packetWriter = new PacketWriter();
867     var rollbackPacket = new RollbackPacket(
868       {
869         casInfo : self._CASInfo
870       }
871     );
872     rollbackPacket.write(packetWriter);
873     self._socket.write(packetWriter._buffer);
874   } else {
875     self._socket.removeAllListeners('data');
876     Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
877     if (callback && typeof(callback) === 'function') {
878       callback.call(self, err);
879     }
880   }
881 
882   self._socket.on('data', function (data) {
883     responseData = Helpers._combineData(responseData, data);
884     if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
885       && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
886       expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
887     }
888     if (responseData.length === expectedResponseLength) {
889       self._socket.removeAllListeners('data');
890       var packetReader = new PacketReader();
891       packetReader.write(data);
892       rollbackPacket.parse(packetReader);
893       var errorCode = rollbackPacket.errorCode;
894       var errorMsg = rollbackPacket.errorMsg;
895       if (errorCode !== 0) {
896         err = new Error(errorCode + ':' + errorMsg);
897       }
898       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_ROLLBACK_COMPLETED);
899       if (callback && typeof(callback) === 'function') {
900         callback.call(self, err);
901       }
902     }
903   });
904 };
905 
906 /**
907  * CommitPacket transaction
908  * @param callback
909  */
910 CUBRIDConnection.prototype.commit = function (callback) {
911   var err = null;
912   var self = this;
913   var responseData = new Buffer(0);
914   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
915 
916   if (self.autoCommitMode === false) {
917     var packetWriter = new PacketWriter();
918     var commitPacket = new CommitPacket(
919       {
920         casInfo : self._CASInfo
921       }
922     );
923     commitPacket.write(packetWriter);
924     self._socket.write(packetWriter._buffer);
925   } else {
926     self._socket.removeAllListeners('data');
927     Helpers._emitEvent(self, err, self.EVENT_ERROR, null);
928     if (callback && typeof(callback) === 'function') {
929       callback.call(self, err);
930     }
931   }
932 
933   self._socket.on('data', function (data) {
934     responseData = Helpers._combineData(responseData, data);
935     if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
936       && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
937       expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
938     }
939     if (responseData.length === expectedResponseLength) {
940       self._socket.removeAllListeners('data');
941       var packetReader = new PacketReader();
942       packetReader.write(data);
943       commitPacket.parse(packetReader);
944       var errorCode = commitPacket.errorCode;
945       var errorMsg = commitPacket.errorMsg;
946       if (errorCode !== 0) {
947         err = new Error(errorCode + ':' + errorMsg);
948       }
949       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_COMMIT_COMPLETED);
950       if (callback && typeof(callback) === 'function') {
951         callback.call(self, err);
952       }
953     }
954   });
955 };
956 
957 /**
958  * Autocommit mode helper
959  * @param self
960  * @param autoCommitMode
961  * @param callback
962  * @private
963  */
964 function _toggleAutoCommitMode(self, autoCommitMode, callback) {
965   var err = null;
966   var responseData = new Buffer(0);
967   var expectedResponseLength = self._INVALID_RESPONSE_LENGTH;
968 
969   if (!Helpers._validateInputBoolean(autoCommitMode)) {
970     Helpers._emitEvent(self, new Error(ErrorMessages.ERROR_INPUT_VALIDATION), self.EVENT_ERROR, null);
971     if (callback && typeof(callback) === 'function') {
972       callback.call(self, err);
973     }
974     return;
975   }
976 
977   if (self.autoCommitMode === autoCommitMode) {
978     if (callback && typeof(callback) === 'function') {
979       callback.call(self, err);
980       return;
981     }
982   }
983 
984   var packetWriter = new PacketWriter();
985   var setAutoCommitModePacket = new SetAutoCommitModePacket(
986     {
987       casInfo        : self._CASInfo,
988       autoCommitMode : autoCommitMode
989     }
990   );
991   setAutoCommitModePacket.write(packetWriter);
992   self._socket.write(packetWriter._buffer);
993 
994   self._socket.on('data', function (data) {
995     responseData = Helpers._combineData(responseData, data);
996     if (expectedResponseLength === -1 && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
997       expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
998     }
999     if (responseData.length === expectedResponseLength) {
1000       self._socket.removeAllListeners('data');
1001       var packetReader = new PacketReader();
1002       packetReader.write(data);
1003       setAutoCommitModePacket.parse(packetReader);
1004       var errorCode = setAutoCommitModePacket.errorCode;
1005       var errorMsg = setAutoCommitModePacket.errorMsg;
1006       if (errorCode !== 0) {
1007         err = new Error(errorCode + ':' + errorMsg);
1008       } else {
1009         self.autoCommitMode = autoCommitMode;
1010       }
1011 
1012       if (callback && typeof(callback) === 'function') {
1013         callback.call(self, err);
1014       }
1015     }
1016   });
1017 }
1018 
1019 /**
1020  * Get databases schema information
1021  * @param schemaType
1022  * @param callback
1023  */
1024 CUBRIDConnection.prototype.getSchema = function (schemaType, callback) {
1025   var err = null;
1026   var self = this;
1027 
1028   var responseData = new Buffer(0);
1029   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
1030 
1031   ActionQueue.enqueue(
1032     [
1033       function (cb) {
1034         if (self.connectionOpened === false) {
1035           self.connect(cb);
1036         } else {
1037           cb();
1038         }
1039       },
1040 
1041       function (cb) {
1042         var packetWriter = new PacketWriter();
1043         var getSchemaPacket = new GetSchemaPacket(
1044           {
1045             casInfo    : self._CASInfo,
1046             schemaType : schemaType
1047           }
1048         );
1049         getSchemaPacket.writeRequestSchema(packetWriter);
1050         self._socket.write(packetWriter._buffer);
1051 
1052         self._socket.on('data', function (data) {
1053           responseData = Helpers._combineData(responseData, data);
1054           if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
1055             && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
1056             expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
1057           }
1058           if (responseData.length === expectedResponseLength) {
1059             self._socket.removeAllListeners('data');
1060             var packetReader = new PacketReader();
1061             packetReader.write(responseData);
1062             getSchemaPacket.parseRequestSchema(packetReader);
1063             var errorCode = getSchemaPacket.errorCode;
1064             var errorMsg = getSchemaPacket.errorMsg;
1065             if (errorCode !== 0) {
1066               err = new Error(errorCode + ':' + errorMsg);
1067             }
1068             if (cb && typeof(cb) === 'function') {
1069               cb.call(self, err, getSchemaPacket);
1070             }
1071           }
1072         });
1073       },
1074 
1075       function (getSchemaPacket, cb) {
1076         expectedResponseLength = -1;
1077         var responseData = new Buffer(0);
1078         var packetWriter = new PacketWriter();
1079         getSchemaPacket.writeFetchSchema(packetWriter);
1080         self._socket.write(packetWriter._buffer);
1081 
1082         self._socket.on('data', function (data) {
1083           responseData = Helpers._combineData(responseData, data);
1084           if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
1085             && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
1086             expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
1087           }
1088           if (responseData.length === expectedResponseLength) {
1089             self._socket.removeAllListeners('data');
1090             var packetReader = new PacketReader();
1091             packetReader.write(responseData);
1092             var result = getSchemaPacket.parseFetchSchema(packetReader);
1093             var errorCode = getSchemaPacket.errorCode;
1094             var errorMsg = getSchemaPacket.errorMsg;
1095             if (errorCode !== 0) {
1096               err = new Error(errorCode + ':' + errorMsg);
1097             }
1098             if (cb && typeof(cb) === 'function') {
1099               cb.call(self, err, result);
1100             }
1101           }
1102         });
1103       }
1104     ],
1105 
1106     function (err, result) {
1107       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_SCHEMA_DATA_AVAILABLE, result);
1108       if (callback && typeof(callback) === 'function') {
1109         callback.call(self, err, result);
1110       }
1111     }
1112   );
1113 };
1114 
1115 /**
1116  * Read data lob object
1117  * @param lobObject
1118  * @param pos
1119  * @param length
1120  * @param callback
1121  */
1122 CUBRIDConnection.prototype.lobRead = function (lobObject, pos, length, callback) {
1123   var err = null;
1124   var self = this;
1125   var responseData = new Buffer(0);
1126   var expectedResponseLength = this._INVALID_RESPONSE_LENGTH;
1127   var buffer;
1128 
1129   pos--;
1130   if (lobObject.lobLength < pos + length) {
1131     err = new Error(ErrorMessages.ERROR_INVALID_LOB_POSITION);
1132     if (typeof err != 'undefined' && err != null) {
1133       Helpers.logError(ErrorMessages.ERROR_ON_CLOSE_QUERY_HANDLE + err);
1134     }
1135     callback.call(null);
1136   }
1137 
1138   var real_read_len;
1139   var read_len;
1140   var total_read_len = 0;
1141 
1142   ActionQueue.while(
1143     function () {
1144       return length > 0;
1145     },
1146 
1147     function (cb) {
1148       read_len = Math.min(length, self._LOB_MAX_IO_LENGTH);
1149       var packetWriter = new PacketWriter();
1150       var lobReadPacket = new LOBReadPacket(
1151         {
1152           casInfo : self._CASInfo
1153         }
1154       );
1155       lobReadPacket.write(packetWriter, lobObject, pos, read_len);
1156       self._socket.write(packetWriter._buffer);
1157 
1158       self._socket.on('data', function (data) {
1159         responseData = Helpers._combineData(responseData, data);
1160         if (expectedResponseLength === self._INVALID_RESPONSE_LENGTH
1161           && responseData.length >= DATA_TYPES.DATA_LENGTH_SIZEOF) {
1162           expectedResponseLength = Helpers._getExpectedResponseLength(responseData);
1163         }
1164         if (responseData.length === expectedResponseLength) {
1165           self._socket.removeAllListeners('data');
1166           var packetReader = new PacketReader();
1167           packetReader.write(data);
1168           real_read_len = lobReadPacket.parse(packetReader);
1169           pos += real_read_len;
1170           length -= real_read_len;
1171           total_read_len += real_read_len;
1172           if (real_read_len === 0) {
1173             length = 0;
1174           }
1175           buffer = lobReadPacket.lobBuffer;
1176           var errorCode = lobReadPacket.errorCode;
1177           var errorMsg = lobReadPacket.errorMsg;
1178           if (errorCode !== 0) {
1179             err = new Error(errorCode + ':' + errorMsg);
1180           }
1181           if (cb && typeof(cb) === 'function') {
1182             cb.call(self, err);
1183           }
1184         }
1185       });
1186     },
1187 
1188     function (err) {
1189       Helpers._emitEvent(self, err, self.EVENT_ERROR, self.EVENT_LOB_READ_COMPLETED, buffer, total_read_len);
1190       if (callback && typeof(callback) === 'function') {
1191         callback.call(self, err, buffer, total_read_len);
1192       }
1193     });
1194 };
1195 
1196 
1197