index.js

'use strict';

/**
 * ODBC specific extension of the {@link Manager~ConnectionOptions} from the [`sqler`](https://ugate.github.io/sqler/) module.
 * @typedef {Manager~ConnectionOptions} OdbcConnectionOptions
 * @property {Object} driverOptions The `odbc` module specific options.
 * @property {Object} driverOptions.connection An object that will contain properties/values that will be used to construct the ODBC connection string.
 * For example, `{ UID: 'someUsername', PWD: 'somePassword' }` would generate `UID=someUsername;PWD=somePassword`.
 * When a property value is a string surrounded by `${}`, it will be assumed to be a property that resides on either the {@link Manager~PrivateOptions}
 * passed into the {@link Manager} constructor or a property on the {@link OdbcConnectionOptions} itself (in that order of precedence). For example, 
 * `connOpts.service = 'SomeDSN'` and `driverOptions.connection.DSN = '${service}'` would be interpolated into `driverOptions.connection.DSN = 'SomeDSN'`. In
 * contrast to `privOpts.username = 'someUsername' and `driverOptions.connection.UID = '${username}'` would be interpolated into
 * `driverOptions.connection.UID = 'someUsername'`.
 * Both of which would ultimately become the ODBC connection string `DSN=SomeDSN;UID=someUsername`.
 * Interpoaltions can also contain more than one reference. For example, `driverOptions.connection.Server = '${protocol}:${host},${port}'` for 
 * `privOpts = { protocol: 'TCP', host: 'example.com', port: 5400 }` would become `Server=TCP:example.com,5400` in the connection string
 * @property {Object} [driverOptions.pool] The pool `conf` options that will be passed into `odbc.pool(conf)`.
 * __Using any of the generic `pool.someOption` will override the `conf` options set on `driverOptions.pool`__ (e.g. `pool.max = 10` would override 
 * `driverOptions.pool.maxSize = 20`).
 * When a value is a string surrounded by `${}`, it will be assumed to be a _constant_ property that resides on the `odbc` module and will be interpolated
 * accordingly.
 * For example `driverOptions.pool.someProp = '${ODBC_CONSTANT}'` will be interpolated as `pool.someProp = odbc.ODBC_CONSTANT`.
 */

/**
 * ODBC specific extension of the {@link Manager~ExecOptions} from the [`sqler`](https://ugate.github.io/sqler/) module. When a property of `binds` contains
 * an object it will be _interpolated_ for property values on the `odbc` module.
 * For example, `binds.name = '${ODBC_CONSTANT}'` will be interpolated as
 * `binds.name = odbc.ODBC_CONSTANT`.
 * @typedef {Manager~ExecOptions} OdbcExecOptions
 * @property {Object} [driverOptions] The `odbc` module specific options.
 * @property {Object} [driverOptions.exec] The options passed into various `odbc` functions during {@link Manager.exec}.
 * When a value is a string surrounded by `${}`, it will be assumed to be a _constant_ property that resides on the `odbc` module and will be interpolated
 * accordingly.
 * For example `driverOptions.exec.isolationLevel = '${SQL_TXN_READ_UNCOMMITTED}'` will be interpolated as
 * `driverOptions.exec.isolationLevel = odbc.SQL_TXN_READ_UNCOMMITTED`.
 * @property {Integer} [driverOptions.exec.isolationLevel] The isolation level to set on the ODBC connection.
 * __Ignored when `opts.type === 'READ'` and  `opts.transactionId` is ommitted.__
 */

/**
 * ODBC {@link Dialect} implementation for [`sqler`](https://ugate.github.io/sqler/)
 */
module.exports = class OdbcDialect {

  /**
   * Constructor
   * @constructs OdbcDialect
   * @param {Manager~PrivateOptions} priv The private configuration options
   * @param {OdbcConnectionOptions} connConf The individual SQL __connection__ configuration for the given dialect that was passed into the originating {@link Manager}
   * @param {Manager~Track} track Container for sharing data between {@link Dialect} instances.
   * @param {Function} [errorLogger] A function that takes one or more arguments and logs the results as an error (similar to `console.error`)
   * @param {Function} [logger] A function that takes one or more arguments and logs the results (similar to `console.log`)
   * @param {Boolean} [debug] A flag that indicates the dialect should be run in debug mode (if supported)
   */
  constructor(priv, connConf, track, errorLogger, logger, debug) {
    if (!connConf.driverOptions) throw new Error('Connection configuration is missing required driverOptions');
    if (!connConf.driverOptions.connection) throw new Error('Connection configuration is missing required driverOptions.connection');
    const dlt = internal(this);
    dlt.at.track = track;
    dlt.at.odbc = require('odbc');
    dlt.at.connections = new Map();
    dlt.at.stmts = new Map();
    dlt.at.opts = {
      autoCommit: true, // default autoCommit = true to conform to sqler
      id: `sqlerOdbcGen${Math.floor(Math.random() * 10000)}`,
      connection: connConf.driverOptions.connection,
      pool: connConf.driverOptions.pool ? dlt.at.track.interpolate({}, connConf.driverOptions.pool, dlt.at.odbc) : {}
    };
    dlt.at.state = {
      pending: 0,
      connection: { count: 0, inUse: 0 }
    };

    dlt.at.errorLogger = errorLogger;
    dlt.at.logger = logger;
    dlt.at.debug = debug;

    dlt.at.opts.pool.initialSize = connConf.pool ? connConf.pool.min : null;
    dlt.at.opts.pool.maxSize = connConf.pool ? connConf.pool.max : null;
    dlt.at.opts.pool.connectionTimeout = connConf.pool ? connConf.pool.idle : null;
    dlt.at.opts.pool.incrementSize = connConf.pool ? connConf.pool.increment : null;
    dlt.at.opts.pool.loginTimeout = connConf.pool ? connConf.pool.timeout : null;

    dlt.at.opts.pool.connectionString = '';
    let cstr = '', val;
    for (let connProp in dlt.at.opts.connection) {
      val = dlt.at.opts.connection[connProp];
      if (typeof val === 'string') {
        // global shallow interpolation to allow multiple interpolated values
        // in a single value (e.g. connection.Server = '${protocol}:${service},${port}')
        // negates the need to use track.interpolate
        val = val.replace(/\${\s*([A-Z_]+)\s*}/ig, (match, name) => {
          if (connConf.hasOwnProperty(name)) {
            if (typeof connConf[name] === 'object') {
              throw new Error(`sqler-odbc: Interpolation "${match}" references a non-transposable Object on connection options:\n${
                JSON.stringify(connConf, null, ' ')
              }`);
            }
            return connConf[name];
          }
          if (priv.hasOwnProperty(name)) {
            if (typeof priv[name] === 'object') {
              throw new Error(`sqler-odbc: Interpolation "${match}" references a non-transposable Object on private options:\n${
                JSON.stringify(priv, (key, val) => key === 'password' ? '***' : val, ' ')
              }`);
            }
            return priv[name];
          }
          throw new Error(`sqler-odbc: Interpolation "${match}" references a non-existent property for both the connection options:\n${
            JSON.stringify(connConf, null, ' ')
          } and the private options:\n${
            JSON.stringify(priv, (key, val) => key === 'password' ? '***' : val, ' ')
          }`);
        });
      }
      cstr += `${cstr ? ';' : ''}${connProp}=${val}`;
    }
    dlt.at.opts.pool.connectionString = cstr;
  }

  /**
   * Initializes {@link OdbcDialect} by creating the connection pool
   * @param {Dialect~DialectInitOptions} opts The options described by the `sqler` module
   * @returns {Object} The ODBC connection pool
   */
  async init(opts) {
    const dlt = internal(this), numSql = opts.numOfPreparedFuncs;
    try {
      dlt.at.pool = await dlt.at.odbc.pool(dlt.at.opts.pool);
      if (dlt.at.logger) {
        dlt.at.logger(`sqler-odbc: Connection pool "${dlt.at.opts.id}" created with (${numSql} SQL files) ` +
          `loginTimeout=${dlt.at.opts.pool.loginTimeout} incrementSize=${dlt.at.opts.pool.incrementSize} ` +
          `initialSize=${dlt.at.opts.pool.initialSize} maxSize=${dlt.at.opts.pool.maxSize} shrink=${dlt.at.opts.pool.shrink}`);
      }
      return dlt.at.pool;
    } catch (err) {
      const msg = `sqler-odbc: connection pool "${dlt.at.opts.id}" could not be created`;
      if (dlt.at.errorLogger) dlt.at.errorLogger(`${msg} (passwords are omitted from error) ${JSON.stringify(err, null, ' ')}`);
      const pconf = Object.assign({}, dlt.at.opts.pool);
      delete pconf.PWD;
      delete pconf.password;
      pconf.connectionString = pconf.connectionString.replace(/(PWD|Password)\s*=\s*[^\s\r\n;]+/gi, '$1=***');
      err.message = `${err.message}\n${msg} for ${JSON.stringify(pconf, null, ' ')}`;
      err.sqlerOdbc = pconf;
      throw err;
    }
  }

  /**
   * Begins a transaction by opening a connection from the pool
   * @param {String} txId The transaction ID that will be started
   */
  async beginTransaction(txId) {
    const dlt = internal(this);
    if (dlt.at.connections.has(txId)) return;
    if (dlt.at.logger) {
      dlt.at.logger(`sqler-odbc: Beginning transaction "${txId}" on connection pool "${dlt.at.opts.id}"`);
    }
    const conn = await dlt.this.getConnection({ transactionId: txId }, true);
    await conn.beginTransaction();
    dlt.at.connections.set(txId, conn);
  }

  /**
   * Executes a SQL statement
   * @param {String} sql the SQL to execute
   * @param {OdbcExecOptions} opts The execution options
   * @param {String[]} frags the frament keys within the SQL that will be retained
   * @param {Manager~ExecMeta} meta The SQL execution metadata
   * @param {(Manager~ExecErrorOptions | Boolean)} [errorOpts] The error options to use
   * @returns {Dialect~ExecResults} The execution results
   */
  async exec(sql, opts, frags, meta, errorOpts) {
    const dlt = internal(this);
    let conn, bndp = {}, ebndp = [], esql, rslts, stmt;
    try {
      // interpolate and remove unused binds since
      // ODBC only accepts the exact number of bind parameters (also, cuts down on payload bloat)
      bndp = dlt.at.track.interpolate(bndp, opts.binds, dlt.at.odbc, props => sql.includes(`:${props[0]}`));

      // odbc expects binds to be in an array
      esql = dlt.at.track.positionalBinds(sql, bndp, ebndp);

      const dopts = opts.driverOptions ? dlt.at.track.interpolate({}, opts.driverOptions, dlt.at.odbc) : {};
      const hasIsoLvl = dopts.hasOwnProperty('isolationLevel');
      const rtn = {};

      if (!opts.transactionId && !opts.prepareStatement && !hasIsoLvl && opts.type === 'READ') {
        rslts = await dlt.at.pool.query(esql, ebndp);
      } else {
        if (opts.prepareStatement) {
          let pso;
          const psname = meta.name;
          if (dlt.at.stmts.has(psname)) {
            pso = dlt.at.stmts.get(psname);
            if (pso.connProm) await pso.connProm;
            if (hasIsoLvl) await pso.conn.setIsolationLevel(dopts.isolationLevel);
            if (pso.stmtProm) await pso.stmtProm;
            if (pso.prepProm) await pso.prepProm;
          } else {
            pso = { sql: esql };
            // set before async in case concurrent PS invocations
            dlt.at.stmts.set(psname, pso);
            pso.connProm = dlt.this.getConnection(opts);  // other PS exec need access to promise in order to wait for connection access
            pso.conn = conn = await pso.connProm; // wait for the initial PS to establish a connection (other PS exec need access to promise)
            if (hasIsoLvl) await pso.conn.setIsolationLevel(dopts.isolationLevel);
            pso.connProm = null; // reset promise once it completes
            pso.stmtProm = conn.createStatement();
            pso.stmt = await pso.stmtProm;
            pso.stmtProm = null;
            pso.prepProm = pso.stmt.prepare(esql);
            await pso.prepProm;
            pso.prepProm = null;
          }
          conn = pso.conn;
          stmt = pso.stmt;
          rtn.unprepare = async () => {
            if (dlt.at.stmts.has(psname)) {
              const pso = dlt.at.stmts.get(psname);
              try {
                await pso.stmt.close();
                dlt.at.stmts.delete(psname);
              } finally {
                if (!opts.transactionId) await pso.conn.close();
              }
            } else if (!opts.transactionId) await conn.close();
          };
          await stmt.bind(ebndp);
          rslts = await stmt.execute();
        } else {
          conn = await dlt.this.getConnection(opts);
          await operation(dlt, 'query', false, {
            query: async () => {
              if (hasIsoLvl) await conn.setIsolationLevel(dopts.isolationLevel);
              rslts = await conn.query(esql, ebndp);
            },
            // tx conn should be left open until commit/rollback
            close: async () => opts.transactionId ? null : conn.close()
          }, opts)();
        }

        if (opts.transactionId) {
          if (opts.autoCommit) {
            // ODBC has no option to autocommit during SQL execution
            await operation(dlt, 'commit', false, conn, opts, rtn.unprepare)();
          } else {
            dlt.at.state.pending++;
            rtn.commit = operation(dlt, 'commit', true, conn, opts, rtn.unprepare);
            rtn.rollback = operation(dlt, 'rollback', true, conn, opts, rtn.unprepare);
          }
        }
      }

      // odbc returns an array rather than rslts.rows array
      rtn.raw = rslts;
      rtn.rows = rslts;

      if (dlt.at.debug && rslts) {
        try {
          console.debug(`sqler-odbc (debug): Executed statement with parameters [${rslts.parameters}]...\n${rslts.statement}\n`);
        } catch (err) {
          // consume
        }
      }

      return rtn;
    } catch (err) {
      if (conn) {
        try {
          await conn.close();
        } catch (cerr) {
          err.closeError = cerr;
        }
      }
      if (dlt.at.errorLogger) {
        dlt.at.errorLogger(`Failed to execute the following SQL: ${sql}`, err);
      }
      err.sqler = { sqlODBC: esql };
      //err.sqler.bindsODBC = errorOpts && errorOpts.includeBindValues ? ebndp : Object.keys(bndp);
      throw err;
    }
  }

  /**
   * Gets the currently open connection or a new connection when no transaction is in progress
   * @protected
   * @param {OdbcExecOptions} opts The execution options
   * @param {Boolean} [begin] Truthy when the `opts.transactionId` is beng started
   * @returns {Object} The connection (when present)
   */
  async getConnection(opts, begin) {
    const dlt = internal(this);
    const txId = opts.transactionId;
    let conn = txId ? dlt.at.connections.get(txId) : null;
    if (!conn) {
      if (txId && !begin) throw new Error(`Invalid transactionId: ${txId}`);
      return dlt.at.pool.connect();
    }
    return conn;
  }

  /**
   * Closes the ODBC connection pool
   * @returns {Integer} The number of connections closed
   */
  async close() {
    const dlt = internal(this);
    try {
      if (dlt.at.logger) {
        dlt.at.logger(`sqler-odbc: Closing connection pool "${dlt.at.opts.id}" (uncommitted transactions: ${dlt.at.state.pending})`);
      }
      if (dlt.at.pool) await dlt.at.pool.close();
      dlt.at.connections.clear();
      dlt.at.stmts.clear();
      return dlt.at.state.pending;
    } catch (err) {
      if (dlt.at.errorLogger) {
        dlt.at.errorLogger(`sqler-odbc: Failed to close connection pool "${dlt.at.opts.id}" (uncommitted transactions: ${dlt.at.state.pending})`, err);
      }
      throw err;
    }
  }

  /**
   * @returns {Manager~State} The state
   */
  get state() {
    const dlt = internal(this);
    if (dlt.at.pool && dlt.at.pool.freeConnections) {
      dlt.at.state.connection.count = dlt.at.pool.freeConnections.length;
      dlt.at.state.connection.inUse = dlt.at.opts.pool.maxSize - dlt.at.state.connection.count;
    }
    return JSON.parse(JSON.stringify(dlt.at.state));
  }

  /**
   * @protected
   * @returns {Object} The ODBC driver module
   */
  get driver() {
    return internal(this).at.odbc;
  }
};

/**
 * Executes a function by name that resides on the ODBC connection
 * @private
 * @param {Object} dlt The internal ODBC object instance
 * @param {String} name The name of the function that will be called on the connection
 * @param {Boolean} reset Truthy to reset the pending connection and transaction count when the operation completes successfully
 * @param {Object} conn The connection
 * @param {Manager~ExecOptions} [opts] The {@link Manager~ExecOptions}
 * @param {Function} [preop] A no-argument async function that will be executed prior to the operation
 * @returns {Function} A no-arguement `async` function that returns the number or pending transactions
 */
function operation(dlt, name, reset, conn, opts, preop) {
  return async () => {
    let error;
    if (preop) await preop();
    try {
      //if (!conn) conn = await dlt.at.pool.connect(); // get connection from the pool
      if (dlt.at.logger) {
        dlt.at.logger(`sqler-odbc: Performing ${name} on connection pool "${dlt.at.opts.id}" (uncommitted transactions: ${dlt.at.state.pending})`);
      }
      await conn[name]();
      if (reset) {
        if (opts && opts.transactionId) dlt.at.connections.delete(opts.transactionId);
        dlt.at.state.pending = 0;
      }
    } catch (err) {
      error = err;
      if (dlt.at.errorLogger) {
        dlt.at.errorLogger(`sqler-odbc: Failed to ${name} ${dlt.at.state.pending} transaction(s) with options: ${
          opts ? JSON.stringify(opts) : 'N/A'}`, error);
      }
      throw error;
    } finally {
      if (name !== 'close') {
        try {
          await conn.close();
        } catch (cerr) {
          if (error) {
            error.sqlerOdbc = {
              closeError: cerr
            };
          }
        }
      }
    }
    return dlt.at.state.pending;
  };
}

// private mapping
let map = new WeakMap();
let internal = function(object) {
  if (!map.has(object)) {
    map.set(object, {});
  }
  return {
    at: map.get(object),
    this: object
  };
};

1.2.1 (2020-06-12)

Full Changelog

Fixes: