index.js

'use strict';

// postgres has a limit of 63 characters for prepared statements
const PREPARED_STMT_NAME_MAX = 63;

/**
 * PostgreSQL {@link Dialect} implementation for [`sqler`](https://ugate.github.io/sqler/).
 * Typically, an application will not have to directly interact with the dialect. All API interactions will take place using the {@link Manager}
 * interface that resides within the [`sqler`](https://ugate.github.io/sqler/) module.
 */
module.exports = class PGDialect {

  /**
   * Constructor
   * @constructs PGDialect
   * @param {SQLERPrivateOptions} priv The private configuration options
   * @param {PGConnectionOptions} connConf The individual SQL __connection__ configuration for the given dialect that was passed into the originating {@link Manager}
   * @param {SQLERTrack} track Container for sharing data between {@link Dialect} instances.
   * @param {Function} [errorLogger] A function that takes one or more arguments and logs the results as an error (similar to `console.error`)
   * @param {Function} [logger] A function that takes one or more arguments and logs the results (similar to `console.log`)
   * @param {Boolean} [debug] A flag that indicates the dialect should be run in debug mode (if supported)
   */
  constructor(priv, connConf, track, errorLogger, logger, debug) {
    if (!connConf.driverOptions) throw new Error('Connection configuration is missing required driverOptions');
    const dlt = internal(this);
    dlt.at.track = track;
    dlt.at.driver = require('pg');
    dlt.at.transactions = new Map();
    dlt.at.opts = {
      autoCommit: true, // default autoCommit = true to conform to sqler
      id: `sqlerPGGen${Math.floor(Math.random() * 10000)}`,
      client: connConf.driverOptions.client ? dlt.at.track.interpolate({}, connConf.driverOptions.client, dlt.at.driver) : {}
    };
    // merge client options into pool options
    dlt.at.opts.pool = connConf.driverOptions.pool ? 
      dlt.at.track.interpolate(dlt.at.opts.client, connConf.driverOptions.pool, dlt.at.driver) : 
      dlt.at.opts.client;
    // sqler compatible state
    dlt.at.state = {
      pending: 0
    };

    dlt.at.errorLogger = errorLogger;
    dlt.at.logger = logger;
    dlt.at.debug = debug;

    if (priv.host) dlt.at.opts.pool.host = priv.host;
    if (priv.hasOwnProperty('port')) dlt.at.opts.pool.port = priv.port;
    dlt.at.opts.pool.user = priv.username;
    dlt.at.opts.pool.password = priv.password;

    if (connConf.pool) {
      // if (connConf.pool.hasOwnProperty('min')) dlt.at.opts.pool.minimumIdle = connConf.pool.min; // not supported
      if (connConf.pool.hasOwnProperty('max')) dlt.at.opts.pool.max = connConf.pool.max;
      if (connConf.pool.hasOwnProperty('idle')) dlt.at.opts.pool.idleTimeoutMillis = connConf.pool.idle;
      // if (connConf.pool.hasOwnProperty('increment')) dlt.at.opts.pool.incrementSize = connConf.pool.increment; // not supported
      if (connConf.pool.hasOwnProperty('timeout')) dlt.at.opts.pool.connectionTimeoutMillis = connConf.pool.timeout;
    }
  }

  /**
   * Initializes {@link PGDialect} by creating the connection pool
   * @param {Dialect~DialectInitOptions} opts The options described by the `sqler` module
   * @returns {Object} The PostgreSQL connection pool
   */
  async init(opts) {
    const dlt = internal(this), numSql = opts.numOfPreparedFuncs;
    let conn, error;
    try {
      dlt.at.pool = new dlt.at.driver.Pool(dlt.at.opts.pool);
      if (dlt.at.logger) {
        dlt.at.logger(`sqler-postgres: Connection pool "${dlt.at.opts.id}" created with (${numSql} SQL files) ` +
          `max=${dlt.at.opts.pool.max} idleTimeoutMillis=${dlt.at.opts.pool.idleTimeoutMillis} ` +
          `connectionTimeoutMillis=${dlt.at.opts.pool.connectionTimeoutMillis}`);
      }
      conn = await dlt.at.pool.connect();
      return dlt.at.pool;
    } catch (err) {
      error = err;
      const msg = `sqler-postgres: connection pool "${dlt.at.opts.id}" could not be created`;
      if (dlt.at.errorLogger) {
        dlt.at.errorLogger(`${msg} (passwords are omitted from error) ${JSON.stringify(err, null, ' ')}`);
      }
      const pconf = Object.assign({}, dlt.at.opts.pool);
      delete pconf.password;
      err.message = `${err.message}\n${msg} for ${JSON.stringify(pconf, null, ' ')}`;
      err.sqlerPG = pconf;
      throw err;
    } finally {
      if (conn) {
        await operation(dlt, 'release', false, conn, opts, null, error)();
      }
    }
  }

  /**
   * Begins a transaction by opening a connection from the pool
   * @param {String} txId The transaction ID that will be started
   * @returns {SQLERTransaction} The transaction
   */
  async beginTransaction(txId) {
    const dlt = internal(this);
    if (dlt.at.logger) {
      dlt.at.logger(`sqler-postgres: Beginning transaction "${txId}" on connection pool "${dlt.at.opts.id}"`);
    }
    /** @type {SQLERTransaction} */
    const tx = {
      id: txId,
      state: Object.seal({
        isCommitted: false,
        isRolledback: false,
        pending: 0
      })
    };
    /** @type {PGTransactionObject} */
    const txo = { tx, conn: await dlt.at.pool.connect() };
    const opts = { transactionId: tx.id };
    await txo.conn.query('BEGIN');
    tx.commit = operation(dlt, 'commit', true, txo, opts, 'unprepare');
    tx.rollback = operation(dlt, 'rollback', true, txo, opts, 'unprepare');
    Object.freeze(tx);
    dlt.at.transactions.set(txId, txo);
    return tx;
  }

  /**
   * Executes a SQL statement
   * @param {String} sql the SQL to execute
   * @param {PGExecOptions} opts The execution options
   * @param {String[]} frags The frament keys within the SQL that will be retained
   * @param {SQLERExecMeta} meta The SQL execution metadata
   * @param {(SQLERExecErrorOptions | Boolean)} [errorOpts] The error options to use
   * @returns {Dialect~ExecResults} The execution results
   */
  async exec(sql, opts, frags, meta, errorOpts) {
    const dlt = internal(this);
    /** @type {PGTransactionObject} */
    const txo = opts.transactionId ? dlt.at.transactions.get(opts.transactionId) : null;
    let conn, bndp = {}, dopts, rslts, error;
    try {
      // interpolate and remove unused binds since
      // PostgreSQL only accepts the exact number of bind parameters (also, cuts down on payload bloat)
      bndp = dlt.at.track.interpolate(bndp, opts.binds, dlt.at.driver, props => sql.includes(`:${props[0]}`));

      // driver options query override
      dopts = opts.driverOptions || {};
      if (!dopts.query) dopts.query = {};
      dopts.query.values = [];
      dopts.query.text = dlt.at.track.positionalBinds(sql, bndp, dopts.query.values, (name, index) => `$${index + 1}`);

      const rtn = {};

      if (!txo && opts.type === 'READ') {
        rslts = await dlt.at.pool.query(dopts.query);
        rtn.rows = rslts.rows;
        rtn.raw = rslts;
      } else {
        // name will cause pg to use a prepared statement
        let psname;
        if (opts.prepareStatement) {
          psname = meta.name.length > PREPARED_STMT_NAME_MAX ? meta.name.substring(meta.name.length - PREPARED_STMT_NAME_MAX) : meta.name;
          if (dopts.query.name) {
            throw new Error(`Prepared statements use internally generated names based upon SQL file meta. Attempted to use "${psname
              }", but found driverOptions.query.name = "${dopts.query.name}"`);
          }
        } else if (dopts.query.name) {
          throw new Error('Prepared statements use internally generated names based upon SQL file meta, but found ' +
            `execOpts.driverOptions.query.name = "${dopts.query.name}"`);
        }
        dopts.query.name = psname;
        conn = txo ? null : await dlt.at.pool.connect();
        rslts = await (txo ? txo.conn : conn).query(dopts.query);
        rtn.rows = rslts.rows;
        rtn.raw = rslts;

        if (opts.prepareStatement) {
          rtn.unprepare = unprepared(dlt, opts, psname);
        }
        if (txo) {
          if (rtn.unprepare) {
            txo.unprepares = txo.unprepares || new Map();
            txo.unprepares.set(psname, rtn.unprepare); // keep track of the prepared statements that have transaction scope
          }
          if (opts.autoCommit) {
            // PostgreSQL has no option to autocommit during SQL execution
            await operation(dlt, 'commit', false, txo, opts, 'unprepare')();
          } else {
            dlt.at.state.pending++;
          }
        }
      }

      return rtn;
    } catch (err) {
      error = err;
      const msg = ` (BINDS: [${Object.keys(bndp)}], FRAGS: ${Array.isArray(frags) ? frags.join(', ') : frags})`;
      if (dlt.at.errorLogger) {
        dlt.at.errorLogger(`Failed to execute the following SQL: ${sql}`, err);
      }
      err.message += msg;
      err.sqlerPG = dopts;
      throw err;
    } finally {
      if (conn) {
        try {
          await operation(dlt, 'release', false, conn, opts)();
        } catch (cerr) {
          if (error) error.closeError = cerr;
        }
      }
    }
  }

  /**
   * Closes the PostgreSQL connection pool
   * @returns {Integer} The number of connections closed
   */
  async close() {
    const dlt = internal(this);
    let error;
    try {
      if (dlt.at.logger) {
        dlt.at.logger(`sqler-postgres: Closing connection pool "${dlt.at.opts.id}" (uncommitted transactions: ${dlt.at.state.pending})`);
      }
      const cproms = [];
      for (let txo of dlt.at.transactions.values()) {
        cproms.push(txo.conn.end());
      }
      if (cproms.length) {
        await Promise.all(cproms);
        dlt.at.transactions.clear();
      }
      if (dlt.at.pool) {
        // pg module contains bug on some occasions calling end w/o a callback
        // may result in unreported errors 
        await dlt.at.pool.end(err => {
          error = err;
        });
      }
    } catch (err) {
      error = err;
      if (dlt.at.errorLogger) {
        dlt.at.errorLogger(`sqler-postgres: Failed to close connection pool "${dlt.at.opts.id}" (uncommitted transactions: ${dlt.at.state.pending})`, err);
      }
    }
    if (error) throw error;
    return dlt.at.state.pending;
  }

  /**
   * @returns {SQLERState} The state
   */
  get state() {
    const dlt = internal(this);
    return {
      connection: {
        count: (dlt.at.pool && dlt.at.pool.totalCount) || 0,
        inUse: (dlt.at.pool && dlt.at.pool.waitingCount) || 0
      },
      pending: dlt.at.state.pending || (dlt.at.pool && dlt.at.pool.waitingCount) || 0
    };
  }

  /**
   * @protected
   * @returns {Object} The PostgreSQL driver module
   */
  get driver() {
    return internal(this).at.driver;
  }
};

/**
 * Executes a function by name that resides on the PostgreSQL connection
 * @private
 * @param {Object} dlt The internal PostgreSQL object instance
 * @param {String} name The name of the function that will be called on the connection
 * @param {Boolean} reset Truthy to reset the pending connection and transaction count when the operation completes successfully
 * @param {(PGTransactionObject | Object)} [txoOrConn] Either the transaction object or the connection itself
 * @param {SQLERExecOptions} [opts] The {@link SQLERExecOptions}
 * @param {String} [preop] An operation name that will be performed before the actual operation. The following values are valid:
 * 1. __`unprepare`__ - Any un-prepare functions that are associated with the passed {@link PGTransactionObject} will be executed.
 * @param {Error} [error] An originating error where any oprational errors will be set as a property of the passed error
 * (e.g. `name = 'close'` would result in `error.closeError = someInternalError`). __Internal Errors will not be thrown.__
 * @returns {Function} A no-arguement `async` function that returns the number or pending transactions
 */
function operation(dlt, name, reset, txoOrConn, opts, preop, error) {
  return async () => {
    /** @type {PGTransactionObject} */
    const txo = opts.transactionId && txoOrConn.tx ? txoOrConn : null;
    const conn = txo ? txo.conn : txoOrConn;
    let ierr;
    if (preop === 'unprepare') {
      if (txo.unprepares) {
        for (let unprepare of txo.unprepares.values()) {
          await unprepare();
        }
        txo.unprepares.clear();
      }
    }
    try {
      if (dlt.at.logger) {
        dlt.at.logger(`sqler-postgres: Performing ${name} on connection pool "${dlt.at.opts.id}" (uncommitted transactions: ${dlt.at.state.pending})`);
      }
      if (name === 'commit' || name === 'rollback') {
        await conn.query(name.toUpperCase());
      } else {
        await conn[name]();
      }
      if (reset) { // not to be confused with pg connection.reset();
        if (txo) dlt.at.transactions.delete(txo.tx.id);
        dlt.at.state.pending = 0;
      }
    } catch (err) {
      ierr = err;
      if (dlt.at.errorLogger) {
        dlt.at.errorLogger(`sqler-postgres: Failed to ${name} ${dlt.at.state.pending} transaction(s) with options: ${
          opts ? JSON.stringify(opts) : 'N/A'}`, ierr);
      }
      if (error) {
        error[`${name}Error`] = err;
      } else {
        throw err;
      }
    } finally {
      if (name !== 'end' && name !== 'release') {
        try {
          await conn.release();
        } catch (endErr) {
          if (ierr) {
            ierr.releaseError = endErr;
          }
        }
      }
    }
    return dlt.at.state.pending;
  };
}

/**
 * Returns a function that deallocates a previously prepared statement
 * @private
 * @param {Object} dlt The internal PostgreSQL object instance
 * @param {SQLERExecOptions} [opts] The {@link SQLERExecOptions}
 * @param {String} name The prepared statement name that will be deallocated
 * @returns {Function} A no-arguement `async` function that returns `undefined` that will deallocate the prepared statement
 */
function unprepared(dlt, opts, name) {
  return async () => {
    const stmt = `DEALLOCATE ${name}`;
    let conn, error;
    try {
      conn = await dlt.at.pool.connect();
      await conn.query({ text: stmt });
      if (conn.connection && conn.connection.parsedStatements) {
        // TODO : not public API facing
        if (conn.connection.parsedStatements.hasOwnProperty(name)) {
          delete conn.connection.parsedStatements[name];
        }
      } else {
        const purgeMsg = `sqler-postgres: Failed to purge prepared statement "${name}". Unable to find "client.connection.parsedStatements". Options:\n${
          opts ? JSON.stringify(opts) : 'N/A'}`;
        if (dlt.at.errorLogger) {
          dlt.at.errorLogger(purgeMsg);
        } else {
          console.warn(purgeMsg);
        }
      }
    } catch (err) {
      error = err;
      error.message += ` FAILED to deallocate/unprepare prepared statement ${name}`;
      throw error;
    } finally {
      if (conn) {
        try {
          await operation(dlt, 'release', false, conn, opts)();
        } catch (cerr) {
          if (dlt.at.errorLogger) {
            dlt.at.errorLogger(`sqler-postgres: Failed to release connection for "${stmt}" with options:\n${
              opts ? JSON.stringify(opts) : 'N/A'}`, cerr);
          }
          if (error) {
            error.closeError = cerr;
          }
        }
      }
    }
  };
}

// private mapping
let map = new WeakMap();
let internal = function(object) {
  if (!map.has(object)) {
    map.set(object, {});
  }
  return {
    at: map.get(object),
    this: object
  };
};

/**
 * PostgreSQL specific extension of the {@link SQLERConnectionOptions} from the [`sqler`](https://ugate.github.io/sqler/) module.
 * @typedef {SQLERConnectionOptions} PGConnectionOptions
 * @property {Object} driverOptions The `pg` module specific options. __Both `client` and `pool` will be merged when generating the connection pool.__
 * @property {Object} [driverOptions.client] An object that will contain properties/values that will be used to construct the PostgreSQL Client
 * (e.g. `{ database: 'mydb', statement_timeout: 10000 }`). See the `pg` module documentation for a full listing of available Client options.
 * When a property value is a string surrounded by `${}`, it will be assumed to be a property that resides on either the {@link SQLERPrivateOptions}
 * passed into the {@link Manager} constructor or a property on the {@link PGConnectionOptions} itself (in that order of precedence). For example, 
 * `clientOpts.host = '127.0.0.1'` and `driverOptions.client.host = '${host}'` would be interpolated into `driverOptions.client.host = '127.0.0.1'`.
 * In contrast to `privOpts.username = 'someUsername' and `driverOptions.client.user = '${username}'` would be interpolated into
 * `driverOptions.client.user = 'someUsername'`.
 * @property {Object} [driverOptions.pool] The pool `conf` options that will be passed into `pg.createPool(conf)`. See the `pg` module for a full
 * listing of avialable connection pooling options.
 * __Using any of the generic `pool.someOption` will override the `conf` options set on `driverOptions.pool`__ (e.g. `pool.max = 10` would override 
 * `driverOptions.pool.max = 20`).
 * When a value is a string surrounded by `${}`, it will be assumed to be a _constant_ property that resides on the `pg` module and will be interpolated
 * accordingly.
 * For example `driverOptions.pool.someProp = '${SOME_PG_CONSTANT}'` will be interpolated as `pool.someProp = pg.SOME_PG_CONSTANT`.
 */

/**
 * PostgreSQL specific extension of the {@link SQLERExecOptions} from the [`sqler`](https://ugate.github.io/sqler/) module. When a property of `binds`
 * contains an object it will be _interpolated_ for property values on the `pg` module.
 * For example, `binds.name = '${SOME_PG_CONSTANT}'` will be interpolated as `binds.name = pg.SOME_PG_CONSTANT`.
 * @typedef {SQLERExecOptions} PGExecOptions
 * @property {Object} [driverOptions] The `pg` module specific options.
 * @property {Object} [driverOptions.query] The options passed into `pg.Client.query` during {@link Manager.exec}. See the `pg` module documentation
 * for a full listing of available query options.
 * When a value is a string surrounded by `${}`, it will be assumed to be a _constant_ property that resides on the `pg` module and will be interpolated
 * accordingly.
 * For example `driverOptions.query.someDriverProp = '${SOME_PG_CONSTANT}'` will be interpolated as
 * `driverOptions.query.someDriverProp = pg.SOME_PG_CONSTANT`.
 * @property {(String | Boolean)} [driverOptions.query.name] As stated in the `pg` documentation, the `name` option will cause a perpared statemenet to be
 * used. `sqler-postgres` allows a `true` value to be set to utilize the internally generated SQL file name to be used instead of explicitly defining a
 * name (which is of course, also supported).
 */

/**
 * Transactions are wrapped in a parent transaction object so private properties can be added (e.g. prepared statements)
 * @typedef {Object} PGTransactionObject
 * @property {SQLERTransaction} tx The transaction
 * @property {Object} conn The connection
 * @property {Map} unprepares Map of prepared statement names (key) and no-argument _async_ functions that will be called as a pre-operation call prior to
 * `commit` or `rollback` (value)
 */

2.0.0 (2021-02-25)

Full Changelog

Features: