'use strict';
const DBDriver = require('mariadb');
const Stream = require('stream');
const EventEmitter = require('events');
const typedefs = require('sqler/typedefs');
const MAX_MDB_NAME_LGTH = 64;
/**
* MariaDB + MySQL {@link Dialect} implementation for [`sqler`](https://ugate.github.io/sqler/)
*/
class MDBDialect {
/**
* Constructor
* @constructs MDBDialect
* @param {typedefs.SQLERPrivateOptions} priv The private configuration options
* @param {MDBConnectionOptions} connConf The individual SQL __connection__ configuration for the given dialect that was passed into the originating {@link Manager}
* @param {typedefs.SQLERTrack} track Container for sharing data between {@link Dialect} instances.
* @param {Function} [errorLogger] A function that takes one or more arguments and logs the results as an error (similar to `console.error`)
* @param {Function} [logger] A function that takes one or more arguments and logs the results (similar to `console.log`)
* @param {Boolean} [debug] A flag that indicates the dialect should be run in debug mode (if supported)
*/
constructor(priv, connConf, track, errorLogger, logger, debug) {
if (!connConf.driverOptions) throw new Error('Connection configuration is missing required driverOptions');
const dlt = internal(this);
dlt.at.track = track;
dlt.at.driver = DBDriver;
dlt.at.transactions = new Map();
dlt.at.stmtFuncs = new Map();
dlt.at.stmts = new Map();
dlt.at.opts = {
autoCommit: true, // default autoCommit = true to conform to sqler
id: `sqlerMDBGen${Math.floor(Math.random() * 10000)}`,
connection: connConf.driverOptions.connection ? dlt.at.track.interpolate({}, connConf.driverOptions.connection, dlt.at.driver) : {}
};
// merge connection options into pool options
dlt.at.opts.pool = connConf.driverOptions.pool ?
dlt.at.track.interpolate(dlt.at.opts.connection, connConf.driverOptions.pool, dlt.at.driver) :
dlt.at.opts.connection;
// for universal sqler compatibility, named place holders are the default
if (!dlt.at.opts.pool.hasOwnProperty('namedPlaceholders')) {
dlt.at.opts.pool.namedPlaceholders = true;
}
// sqler compatible state
dlt.at.state = {
pending: 0,
connection: { count: 0, inUse: 0 }
};
dlt.at.errorLogger = errorLogger;
dlt.at.logger = logger;
dlt.at.debug = debug;
if (priv.host) dlt.at.opts.pool.host = priv.host;
if (priv.hasOwnProperty('port')) dlt.at.opts.pool.port = priv.port;
dlt.at.opts.pool.user = priv.username;
dlt.at.opts.pool.password = priv.password;
if (connConf.pool) {
if (connConf.pool.hasOwnProperty('min')) dlt.at.opts.pool.minimumIdle = connConf.pool.min;
if (connConf.pool.hasOwnProperty('max')) dlt.at.opts.pool.connectionLimit = connConf.pool.max;
if (connConf.pool.hasOwnProperty('idle')) dlt.at.opts.pool.idleTimeout = connConf.pool.idle;
// if (connConf.pool.hasOwnProperty('increment')) dlt.at.opts.pool.incrementSize = connConf.pool.increment; // not supported
if (connConf.pool.hasOwnProperty('timeout')) {
dlt.at.opts.pool.acquireTimeout = connConf.pool.timeout;
dlt.at.opts.pool.connectTimeout = connConf.pool.timeout;
}
}
}
/**
* Initializes {@link MDBDialect} by creating the connection pool
* @param {typedefs.SQLERInitOptions} opts The options described by the `sqler` module
* @returns {Object} The MariaDB/MySQL connection pool
*/
async init(opts) {
const dlt = internal(this), numSql = opts.numOfPreparedFuncs;
/** @type {InternalFlightRecorder} */
let recorder;
let conn, error;
try {
dlt.at.pool = dlt.at.driver.createPool(dlt.at.opts.pool);
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: Connection pool "${dlt.at.opts.id}" created with (${numSql} SQL files) ` +
`acquireTimeout=${dlt.at.opts.pool.acquireTimeout} minimumIdle=${dlt.at.opts.pool.minimumIdle} ` +
`connectionLimit=${dlt.at.opts.pool.connectionLimit} idleTimeout=${dlt.at.opts.pool.idleTimeout}`);
}
conn = await dlt.at.pool.getConnection();
await conn.ping();
return dlt.at.pool;
} catch (err) {
recorder = errored(`sqler-mdb: connection pool "${dlt.at.opts.id}" could not be created`, dlt, null, err);
error = err;
const msg = `sqler-mdb: connection pool "${dlt.at.opts.id}" could not be created`;
if (dlt.at.errorLogger) dlt.at.errorLogger(`${msg} (passwords are omitted from error) ${JSON.stringify(err, null, ' ')}`);
const pconf = Object.assign({}, dlt.at.opts.pool);
delete pconf.password;
err.message = `${err.message}\n${msg} for ${JSON.stringify(pconf, null, ' ')}`;
err.sqlerMDB = pconf;
throw err;
} finally {
if (conn) {
await finalize(recorder, dlt, operation(dlt, 'release', false, conn, opts));
}
}
}
/**
* Begins a transaction by opening a connection from the pool
* @param {String} txId The internally generated transaction identifier
* @param {typedefs.SQLERTransactionOptions} opts The transaction options passed in via the public API
* @returns {typedefs.SQLERTransaction} The transaction that was started
*/
async beginTransaction(txId) {
const dlt = internal(this);
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: Beginning transaction "${txId}" on connection pool "${dlt.at.opts.id}"`);
}
/** @type {typedefs.SQLERTransaction} */
const tx = {
id: txId,
state: Object.seal({
committed: 0,
rolledback: 0,
pending: 0,
isReleased: false
})
};
/** @type {MDBTransactionObject} */
const txo = { tx, conn: await dlt.at.pool.getConnection() };
const opts = { transactionId: tx.id };
await txo.conn.beginTransaction();
const commit = operation(dlt, 'commit', false, txo, opts, 'unprepare');
tx.commit = async (isRelease) => {
await commit();
if (isRelease) await operation(dlt, 'release', true, txo, opts)();
};
const rollback = operation(dlt, 'rollback', false, txo, opts, 'unprepare');
tx.rollback = async (isRelease) => {
await rollback();
if (isRelease) await operation(dlt, 'release', true, txo, opts)();
};
Object.freeze(tx);
dlt.at.transactions.set(txId, txo);
return tx;
}
/**
* Executes a SQL statement
* @param {String} sql the SQL to execute
* @param {MDBExecOptions} opts The execution options
* @param {String[]} frags the frament keys within the SQL that will be retained
* @param {typedefs.SQLERExecMeta} meta The SQL execution metadata
* @param {(typedefs.SQLERExecErrorOptions | Boolean)} [errorOpts] The error options to use
* @returns {typedefs.SQLERExecResults} The execution results
*/
async exec(sql, opts, frags, meta, errorOpts) {
/** @type {InternalFlightRecorder} */
let recorder;
const dlt = internal(this);
const txo = opts.transactionId ? dlt.at.transactions.get(opts.transactionId) : null;
/** @type {DBDriver.Connection} */
let conn;
let rslts;
/** @type {InternalExecMeta} */
let execMeta;
/** @type {InternalPreparedStatement} */
let pso;
try {
/** @type {typedefs.SQLERExecResults} */
const rtn = {};
if (!txo && !opts.prepareStatement && opts.stream < 0 && opts.type === 'READ') {
// pool.query will auto-release the connection
execMeta = createExecMeta(dlt, sql, opts);
rslts = await dlt.at.pool.query(execMeta.sql, execMeta.binds);
} else {
if (opts.stream >= 0) { // streams handle prepared statements when streaming starts
rslts = [ opts.type === 'READ' ? await createReadStream(dlt, sql, opts, meta, txo, rtn) : createWriteStream(dlt, sql, opts, meta, txo, rtn) ];
} else if (opts.prepareStatement) {
pso = await prepared(dlt, sql, opts, meta, txo, rtn);
rslts = await pso.exec(opts.binds);
} else {
execMeta = createExecMeta(dlt, sql, opts);
conn = txo ? null : await dlt.at.pool.getConnection();
rslts = await (txo ? txo.conn : conn).query(execMeta.sql, execMeta.binds);
}
if (txo) {
if (rtn.unprepare) {
txo.unprepares = txo.unprepares || new Map();
txo.unprepares.set(pso.name, rtn.unprepare); // keep track of the prepared statements that have transaction scope
}
if (opts.autoCommit) {
// MariaDB/MySQL has no option to autocommit during SQL execution
await operation(dlt, 'commit', false, txo, opts, opts.prepareStatement ? 'unprepare' : null)();
} else {
txo.tx.state.pending++;
dlt.at.state.pending++;
}
}
}
rtn.rows = rslts;
rtn.raw = rslts;
return rtn;
} catch (err) {
recorder = errored(`sqler-mdb: Failed to execute the following SQL:\n${sql}`, dlt, meta, err);
throw err;
} finally {
if (conn) {
await finalize(recorder, dlt, operation(dlt, 'release', false, conn, opts));
}
}
}
/**
* Closes the MariaDB/MySQL connection pool
* @returns {Integer} The number of connections closed
*/
async close() {
const dlt = internal(this);
try {
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: Closing connection pool "${dlt.at.opts.id}"${statusLabel(dlt)}`);
}
if (dlt.at.pool) {
await dlt.at.pool.end();
dlt.at.transactions.clear();
dlt.at.stmts.clear();
}
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: Closed connection pool "${dlt.at.opts.id}"${statusLabel(dlt)}`);
}
return dlt.at.state.pending;
} catch (err) {
errored(`sqler-mdb: Failed to close connection pool "${dlt.at.opts.id}"${statusLabel(dlt)}`, dlt, null, err);
throw err;
}
}
/**
* @returns {typedefs.SQLERState} The state
*/
get state() {
return JSON.parse(JSON.stringify(internal(this).at.state));
}
/**
* @protected
* @returns {DBDriver} The MariaDB/MySQL driver module
*/
get driver() {
return internal(this).at.driver;
}
}
module.exports = MDBDialect;
/**
* Creates bind parameters suitable for SQL execution in MySQL/MariaDB
* @private
* @param {MDBInternal} dlt The internal MariaDB/MySQL object instance
* @param {String} sql the SQL to execute
* @param {MDBExecOptions} opts The execution options
* @param {Object} [bindsAlt] An alternative to `opts.binds` that will be used
* @returns {InternalExecMeta} The binds metadata
*/
function createExecMeta(dlt, sql, opts, bindsAlt) {
/** @type {InternalExecMeta} */
const rtn = {};
// interpolate and remove unused binds since
// MariaDB/MySQL only accepts the exact number of bind parameters (also, cuts down on payload bloat)
rtn.bndp = dlt.at.track.interpolate({}, bindsAlt || opts.binds, dlt.at.driver, props => sql.includes(`:${props[0]}`));
// execution formatted version of `rtn.bndp` that is an array of values format to support MySQL/MariaDB use of `?` parameter markers
// (empty when the bind meta is for a prepared statement)
let ebndp;
// formatted/bound execution SQL statement
let esql;
// driver options exec override the
rtn.dopts = opts.driverOptions || {};
const named = rtn.dopts.exec && rtn.dopts.exec.hasOwnProperty('namedPlaceholders') ? rtn.dopts.exec.namedPlaceholders :
dlt.at.opts.pool.namedPlaceholders;
if (opts.prepareStatement) { // prepared statements always use named parameter markers
if (!rtn.dopts.preparedStatementDatabase) {
throw new Error('A valid database name must be provided using "execOpts.driverOptions.preparedStatementDatabase" ' +
'when "execOpts.prepareStatement = true"');
}
esql = dlt.at.track.positionalBinds(sql, rtn.bndp, [], pname => `@${pname}`);
} else { // use "?" parameter markers
esql = named ? sql : dlt.at.track.positionalBinds(sql, rtn.bndp, ebndp = []);
}
if (rtn.dopts.exec) rtn.dopts.exec.sql = esql;
rtn.sql = rtn.dopts.exec || esql;
rtn.binds = ebndp || rtn.bndp;
return rtn;
}
/**
* Executes a function by name that resides on the MariaDB/MySQL connection
* @private
* @param {MDBInternal} dlt The internal MariaDB/MySQL object instance
* @param {String} name The name of the function that will be called on the connection
* @param {Boolean} reset Truthy to reset the pending connection and transaction count when the operation completes successfully
* @param {(MDBTransactionObject | Object)} txoOrConn Either the transaction object or the connection itself
* @param {typedefs.SQLERExecOptions} [opts] The {@link SQLERExecOptions}
* @param {String} [preop] An operation name that will be performed before the actual operation. The following values are valid:
* 1. __`unprepare`__ - Any un-prepare functions that are associated with the passed {@link MDBTransactionObject} will be executed.
* @returns {Function} A no-arguement `async` function that returns the number or pending transactions
*/
function operation(dlt, name, reset, txoOrConn, opts, preop) {
return async () => {
/** @type {InternalFlightRecorder} */
let recorder = {};
/** @type {MDBTransactionObject} */
const txo = opts.transactionId && txoOrConn.tx ? txoOrConn : null;
/** @type {DBDriver.Connection} */
const conn = txo ? txo.conn : txoOrConn;
if (preop === 'unprepare') {
if (txo.unprepares) {
for (let unprepare of txo.unprepares.values()) {
await unprepare();
}
txo.unprepares.clear();
}
}
try {
if (txo && txo.tx.state.isReleased && (name === 'commit' || name === 'rollback')) {
return Promise.reject(new Error(`"${name}" already called on transaction "${txo.tx.id}"`));
}
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: Performing ${name} on connection pool "${dlt.at.opts.id}"${statusLabel(dlt, opts)}`);
}
await conn[name]();
if (txo) {
if (name === 'commit') {
txo.tx.state.committed++;
} else if (name === 'rollback') {
txo.tx.state.rolledback++;
} else if (name === 'release') {
txo.tx.state.isReleased = true;
}
}
if (reset) { // not to be confused with mariadb connection.reset();
if (txo) dlt.at.transactions.delete(txo.tx.id);
dlt.at.state.pending = 0;
}
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: Performed ${name} on connection pool "${dlt.at.opts.id}" ${statusLabel(dlt, opts)}`);
}
} catch (err) {
recorder = errored(`sqler-mdb: Failed to ${name} ${dlt.at.state.pending} transaction(s) with options: ${
opts ? JSON.stringify(Object.keys(opts)) : 'N/A'}`, dlt, null, err);
throw err;
} finally {
if ((recorder && recorder.error) || (!txo && conn && name !== 'end' && name !== 'release')) {
await finalize(recorder, dlt, () => Promise.resolve(conn.release()));
}
}
return dlt.at.state.pending;
};
}
/**
* Either generates a prepared statement when it doesn't currently exist, or returns an existing prepared statement that waits for the original prepared statement
* creation/connectivity/setup to complete before performing any executions.
* @private
* @param {MDBInternal} dlt The internal MariaDB/MySQL object instance
* @param {String} sql The raw SQL to execute for the prepared statement
* @param {MDBExecOptions} opts The execution options
* @param {typedefs.SQLERExecMeta} meta The SQL execution metadata
* @param {MDBTransactionObject} [txo] The transaction object to use. When not specified, a connection will be established.
* @param {typedefs.SQLERExecResults} rtn The execution results used by the prepared statement where `unprepare` will be set
* @returns {InternalPreparedStatement} The prepared statement
*/
async function prepared(dlt, sql, opts, meta, txo, rtn) {
let isPrepare;
/** @type {InternalPreparedStatement} */
let pso;
if (dlt.at.stmts.has(meta.name)) {
pso = dlt.at.stmts.get(meta.name);
} else {
isPrepare = true;
pso = new EventEmitter();
pso.unprepare = async () => {
const hasTX = opts.transactionId && dlt.at.transactions.has(opts.transactionId);
if (dlt.at.stmts.has(meta.name)) {
const pso = dlt.at.stmts.get(meta.name);
try {
await pso.conn.query(`DEALLOCATE PREPARE ${pso.name}`);
dlt.at.stmts.delete(meta.name);
// need to drop separately since drop procedure cannot be done from within a procedure
await pso.conn.query(`DROP PROCEDURE ${pso.procedure}`);
} finally {
if (!hasTX) {
await operation(dlt, 'release', false, pso.conn, opts)();
if (opts.stream >= 0) pso.emit(typedefs.EVENT_STREAM_RELEASE);
}
}
}
};
pso.name = meta.name;
pso.sql = sql;
// set before async in case concurrent PS invocations
dlt.at.stmts.set(pso.name, pso);
pso.connProm = txo ? Promise.resolve(txo.conn) : dlt.at.pool.getConnection(); // other PS exec need access to promise in order to wait for connection access
pso.conn = await pso.connProm; // wait for the initial PS to establish a connection (other PS exec need access to promise)
pso.connProm = null; // reset promise once it completes
}
rtn.unprepare = pso.unprepare;
if (isPrepare) {
pso.name = meta.name;
pso.shortName = pso.name.length > MAX_MDB_NAME_LGTH ? `sqler_mdb_prep_stmt${Math.floor(Math.random() * 10000)}` : pso.name;
pso.exec = async (binds) => {
if (!dlt.at.stmts.has(meta.name)) {
throw new Error(`sqler-mdb: Prepared statement not found for "${meta.name}" during writable stream batch`);
}
/** @type {InternalPreparedStatement} */
const pso = dlt.at.stmts.get(meta.name);
/** @type {InternalExecMeta} */
let execMeta;
/** @type {InternalFlightRecorder} */
let recorder;
try {
if (!pso.execMeta) {
pso.execMeta = createExecMeta(dlt, sql, opts, binds);
pso.procedure = `\`${pso.execMeta.dopts.preparedStatementDatabase}\`.\`${pso.shortName}\``;
pso.escapedSQL = pso.conn.escape(pso.execMeta.sql); // execMeta.sql.replace(/([^'\\]*(?:\\.[^'\\]*)*)'/g, "$1\\'");
pso.bnames = Object.getOwnPropertyNames(pso.execMeta.bndp);
pso.psql = preparedStmtProc(pso);
pso.procProm = pso.conn.query(pso.psql); // prepare/exec PS (other exec need access to wait for proc to be created)
await pso.procProm; // wait for the PS stored proc to be created
pso.prepareExecProm = preparedStmtProcExec(pso, pso.conn, pso.execMeta.bndp, false, true); // wait for the initial PS stored proc to be created/executed
pso.procProm = pso.prepareExecProm = null; // reset promises once they completed
execMeta = pso.execMeta;
} else {
execMeta = createExecMeta(dlt, sql, opts, binds);
if (pso.connProm) await pso.connProm; // wait for the initial PS to establish a connection
if (pso.procProm) await pso.procProm; // wait for the initial PS stored proc to be created
if (pso.prepareExecProm) await pso.prepareExecProm; // wait for the initial PS to be prepared
}
const isStream = opts.stream >= 0 && opts.type === 'READ';
let rslts = preparedStmtProcExec(pso, pso.conn, execMeta.bndp, isStream);
if (!isStream) rslts = await rslts;
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: ${isStream ? 'Created readable stream' : 'Completed execution'} of prepared statement "${pso.name}"${
dlt.at.debug ? ` for binds ${Object.keys(execMeta.bndp)} on SQL\n${execMeta.sql}` : ''}`);
}
return rslts;
} catch (err) {
recorder = errored(`sqler-mdb: Failed to execute prepared statement with binds ${execMeta ? Object.keys(execMeta.bndp) : 'N/A'} on SQL:\n${
sql}`, dlt, meta, err);
throw err;
} finally {
await finalize(recorder, dlt);
}
};
pso.batch = async (batch) => {
/** @type {InternalPreparedStatement} */
const pso = dlt.at.stmts.get(meta.name);
const rslts = new Array(batch.length);
let bi = 0;
for (let binds of batch) {
rslts[bi] = await pso.exec(binds);
bi++;
}
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: Completed execution of ${batch.length} batched prepared statement(s) for "${pso.name}"`);
}
return rslts;
};
} else {
pso = dlt.at.stmts.get(meta.name);
if (pso.connProm) await pso.connProm; // wait for the initial PS to establish a connection
if (pso.procProm) await pso.procProm; // wait for the initial PS stored proc to be created
if (pso.prepareExecProm) await pso.prepareExecProm; // wait for the initial PS to be prepared
}
return pso;
}
/**
* Generates a stored procedure that accepts an _operation name_ and a JSON data type.
* The operation name can be one of the following:
* - `prepare` - Prepares the statement
* - `execute` - Executes the statement using the passed JSON
* - `prepare_execute` - Prepares and executes the statement using the passed JSON
* @private
* @param {InternalPreparedStatement} pso The prepared statement object that will be used to generate the stored procedure
* @returns {String} The stored procedure for the prepared statement
*/
function preparedStmtProc(pso) {
return `CREATE PROCEDURE ${pso.procedure}(IN oper VARCHAR(15), IN vars JSON)
BEGIN
IF (oper = 'prepare' OR oper = 'prepare_execute') THEN
PREPARE ${pso.shortName} FROM ${pso.escapedSQL};
END IF;
IF (oper = 'execute' OR oper = 'prepare_execute') THEN
${ pso.bnames.length ? pso.bnames.map(nm => `
SET @${nm} := JSON_UNQUOTE(JSON_EXTRACT(vars, '$.${nm}'));`).join('') : ''
}
EXECUTE ${pso.shortName};
END IF;
END;
`;
}
/**
* Calls a prepared statement stored procedure execution
* @private
* @param {InternalPreparedStatement} pso The prepared statement object that will be used to generate the stored procedure
* @param {DBDriver.Connection} conn The connection
* @param {Object} binds The binds
* @param {Boolean} [useQueryStream]
* @param {Boolean} [prepare] Truthy to prepare the satement before execution
* @returns {(Stream.Readable | Promise)} The prepared statement procedure call result
*/
function preparedStmtProcExec(pso, conn, binds, useQueryStream, prepare) {
const sql = `CALL ${pso.procedure}('${prepare ? 'prepare_' : ''}execute', JSON_OBJECT(${
pso.bnames.map(name => `${conn.escape(name)},${conn.escape(binds[name])}`).join(',')
}))`;
return useQueryStream ? conn.queryStream(sql, binds) : conn.query(sql, binds);
}
/**
* Returns a label that contains connection details, transaction counts, etc.
* @private
* @param {MDBInternal} dlt The internal MariaDB/MySQL object instance
* @param {MDBExecOptions} [opts] Execution options that will be included in the staus label
* @param {MDBTransactionObject} [txo] An optional transactiopn to add to the status label
* @returns {String} The status label
*/
function statusLabel(dlt, opts, txo) {
try {
return `(( ${opts ? `[ ${opts.name ? `name: ${opts.name}, ` : ''}type: ${opts.type} ]` : ''}[ uncommitted transactions: ${
dlt.at.state.pending}${dlt.at.pool ? `, total connections: ${dlt.at.pool.totalConnections()}, active connections: ${
dlt.at.pool.activeConnections()}, idle connections: ${dlt.at.pool.idleConnections()}, queue size: ${dlt.at.pool.taskQueueSize()}` : ''}${
txo ? ` - Transaction state: ${JSON.stringify(txo.tx.state)}` : ''} ))`;
} catch (err) {
if (dlt.at.errorLogger) {
dlt.at.errorLogger('sqler-mdb: Failed to create status label', err);
}
}
}
/**
* Creates a read stream that batches the read SQL executions
* @private
* @param {MDBInternal} dlt The internal MariaDB/MySQL object instance
* @param {String} sql The SQL to execute.
* @param {MDBExecOptions} opts The execution options
* @param {typedefs.SQLERExecMeta} meta The SQL execution metadata
* @param {MDBTransactionObject} [txo] The transaction object to use. When not specified, a connection will be established on the first write to the stream.
* @param {typedefs.SQLERExecResults} rtn Where the _public_ prepared statement functions will be set (ignored when the read stream is not for a prepared
* statement).
* @returns {Stream.Readable} The created read stream
*/
async function createReadStream(dlt, sql, opts, meta, txo, rtn) {
/** @type {Promise<DBDriver.Connection>} */
let connProm;
/** @type {InternalFlightRecorder[]} */
const recorders = [];
/** @type {Stream.Readable} */
let readable;
if (opts.prepareStatement) {
const pso = await prepared(dlt, sql, opts, meta, txo, rtn);
pso.on(typedefs.EVENT_STREAM_RELEASE, () => readable && readable.emit(typedefs.EVENT_STREAM_RELEASE));
readable = await pso.exec();
} else {
/** @type {InternalExecMeta} */
const execMeta = createExecMeta(dlt, sql, opts);
/** @type {DBDriver.Connection} */
const conn = txo ? txo.conn : connProm ? await connProm : await (connProm = dlt.at.pool.getConnection());
readable = conn.queryStream(execMeta.sql, execMeta.binds);
}
// dlt.at.track.readable(opts, readable);
readable.on('error', async (err) => {
if (err.sqlerMDB) return;
recorders.push(errored(`sqler-mdb: An error occurred during ${Stream.Readable.name} streaming for SQL:\n${sql}`, dlt, meta, err));
});
readable.on('close', closeStreamHandler(dlt, sql, opts, meta, txo, () => connProm, readable, recorders));
return readable;
}
/**
* Creates a write stream that batches the write SQL executions
* @private
* @param {MDBInternal} dlt The internal MariaDB/MySQL object instance
* @param {String} sql The SQL to execute
* @param {MDBExecOptions} opts The execution options
* @param {typedefs.SQLERExecMeta} meta The SQL execution metadata
* @param {MDBTransactionObject} [txo] The transaction object to use. When not specified, a connection will be established on the first write to the stream.
* @param {typedefs.SQLERExecResults} rtn Where the _public_ prepared statement functions will be set (ignored when the write stream is not for a prepared
* statement).
* @returns {Stream.Writable} The created write stream
*/
function createWriteStream(dlt, sql, opts, meta, txo, rtn) {
/** @type {Promise<DBDriver.Connection>} */
let connProm;
/** @type {InternalFlightRecorder[]} */
const recorders = [];
const writable = dlt.at.track.writable(opts, async (batch) => {
try {
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: Started ${Stream.Writable.name} stream execution for ${batch.length} batches ${statusLabel(dlt, opts, txo)}`);
}
if (opts.prepareStatement) {
const pso = await prepared(dlt, sql, opts, meta, txo, rtn);
return await pso.batch(batch);
}
/** @type {DBDriver.Connection} */
const conn = txo ? txo.conn : connProm ? await connProm : await (connProm = dlt.at.pool.getConnection());
// batch all the binds into a single exectuion for a performance gain
// https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
/** @type {InternalExecMeta} */
let execMeta;
let bi = 0;
const bindsArray = new Array(batch.length);
for (let binds of batch) {
execMeta = createExecMeta(dlt, sql, opts, binds);
bindsArray[bi] = execMeta.binds;
bi++;
}
let rslts = await conn.batch(execMeta.sql, bindsArray);
if (dlt.at.logger) {
dlt.at.logger(`sqler-mdb: Completed ${Stream.Writable.name} stream execution for ${batch.length} batches ${statusLabel(dlt, opts, txo)}`);
}
return rslts;
} catch (err) {
err.batchSize = batch ? batch.length : 0;
writable.emit('error', err);
}
});
writable.on('error', async (err) => {
if (err.sqlerMDB) return;
recorders.push(errored(`sqler-mdb: An error occurred during ${Stream.Writable.name} streaming for SQL:\n${sql}`, dlt, meta, err));
});
writable.on('close' /* 'finish' */, closeStreamHandler(dlt, sql, opts, meta, txo, () => connProm, writable, recorders));
return writable;
}
/**
* Handles a `close` event on a stream by closing a connection (when passed), emitting the {@link typedefs.EVENT_STREAM_RELEASE} event and handling a transaction
* `commit` (when a {@link MDBTransactionObject} is passed).
* @private
* @param {MDBInternal} dlt The internal dialect object instance
* @param {String} sql The SQL to execute
* @param {MDBExecOptions} opts The execution options
* @param {typedefs.SQLERExecMeta} meta The SQL execution metadata
* @param {MDBTransactionObject} [txo] The transaction object to use. When not specified, a connection will be established on the first write to the stream.
* @param {Function} [getConn] An `async function()` to get the {@link DBDriver.Connection} that will be closed (ignored when a transaction is specified).
* @param {(Stream.Readable | Stream.Writable)} stream The stream where the `close` event will be emitted.
* @param {InternalFlightRecorder[]} recorders The flight recorders where the any errors will be recorded.
* @returns {Function} An `async function()` that handles the `close` event on the specified stream
*/
function closeStreamHandler(dlt, sql, opts, meta, txo, getConn, stream, recorders) {
const type = stream instanceof Stream.Readable ? Stream.Readable.name : stream instanceof Stream.Writable ? Stream.Writable.name : 'N/A';
let isCommitted;
return async () => {
try {
/** @type {DBDriver.Connection} */
const conn = typeof getConn === 'function' ? await getConn() : null;
if (conn) {
await operation(dlt, 'release', false, conn, opts)();
stream.emit(typedefs.EVENT_STREAM_RELEASE);
}
if (txo && opts.autoCommit && !recorders.length) {
await operation(dlt, 'commit', false, txo, opts, opts.prepareStatement ? 'unprepare' : null)();
isCommitted = true;
stream.emit(typedefs.EVENT_STREAM_COMMIT, txo.tx.id);
} else if (txo) {
txo.tx.state.pending++;
dlt.at.state.pending++;
}
} catch (err) {
recorders.push(errored(`sqler-mdb: Failed to handle ${type} stream close event for SQL:\n${sql}`, dlt, meta, err));
stream.emit('error', recorders[recorders.length - 1].error);
} finally {
if (!isCommitted && txo && opts.autoCommit && recorders.length) {
await finalize(recorders, dlt, async () => {
await operation(dlt, 'rollback', false, txo, opts, opts.prepareStatement ? 'unprepare' : null)();
stream.emit(typedefs.EVENT_STREAM_ROLLBACK, txo.tx.id);
});
}
}
};
}
/**
* Error handler
* @private
* @param {String} label A label to use to describe the error
* @param {MDBInternal} dlt The internal dialect object instance
* @param {typedefs.SQLERExecMeta} [meta] The SQL execution metadata
* @param {Error} error An error that has occurred
* @returns {InternalFlightRecorder} The flight recorder
*/
function errored(label, dlt, meta, error) {
if (dlt.at.errorLogger) {
dlt.at.errorLogger(label, error);
}
error.sqlerMDB = {};
if (meta && dlt.at.stmts.has(meta.name)) {
const pso = dlt.at.stmts.get(meta.name);
error.sqlerMDB.preparedStmtName = pso.name;
error.sqlerMDB.preparedStmtProc = pso.psql;
}
return { error };
}
/**
* Finally block handler
* @private
* @param {(InternalFlightRecorder | InternalFlightRecorder[])} [recorder] The flight recorder
* @param {MDBInternal} dlt The internal dialect object instance
* @param {Function} [func] An `async function()` that will be invoked in a catch wrapper that will be consumed and recorded when a flight recorder is
* provided
* @param {String} [funcErrorProperty=releaseError] A property name on the flight recorder error that will be set when the `func` itself errors
* @returns {InternalFlightRecorder} The recorded error
*/
async function finalize(recorder, dlt, func, funcErrorProperty = 'releaseError') {
// transactions/prepared statements need the connection to remain open until commit/rollback/unprepare
if (typeof func === 'function') {
try {
await func();
} catch (err) {
if (recorder) {
for (let rec of Array.isArray(recorder) ? recorder : [recorder]) {
if (rec.error) recorder.error[funcErrorProperty] = err;
}
}
}
}
}
// private mapping
let map = new WeakMap();
/**
* Internal state generator
* @private
* @param {MDBDialect} dialect The dialect instance
* @returns {MDBInternal} The internal dialect state
*/
let internal = function(dialect) {
if (!map.has(dialect)) {
map.set(dialect, {});
}
return {
at: map.get(dialect),
this: dialect
};
};
/**
* The `mariadb` (w/MySQL support) module specific options. __Both `connection` and `pool` will be merged when generating the connection pool.__
* @typedef {Object} MDBConnectionDriverOptions
* @property {Object} [connection] An object that will contain properties/values that will be used to construct the MariaDB + MySQL connection
* (e.g. `{ database: 'mydb', timezone: '-0700' }`). See the `mariadb` module documentation for a full listing of available connection options.
* When a property value is a string surrounded by `${}`, it will be assumed to be a property that resides on either the {@link SQLERPrivateOptions}
* passed into the {@link Manager} constructor or a property on the {@link MDBConnectionOptions} itself (in that order of precedence). For example,
* `connOpts.host = '127.0.0.1'` and `driverOptions.connection.host = '${host}'` would be interpolated into `driverOptions.connection.host = '127.0.0.1'`.
* In contrast to `privOpts.username = 'someUsername' and `driverOptions.connection.user = '${username}'` would be interpolated into
* `driverOptions.connection.user = 'someUsername'`.
* Interpoaltions can also contain more than one reference. For example, `driverOptions.connection.someProp = '${protocol}:${host}'` for
* `privOpts = { protocol: 'TCP', host: 'example.com' }` would become `someProp='TCP:example.com'`.
* @property {Object} [pool] The pool `conf` options that will be passed into `mariadb.createPool(conf)`. See the `mariadb` module for a full
* listing of avialable connection pooling options.
* __Using any of the generic `pool.someOption` will override the `conf` options set on `driverOptions.pool`__ (e.g. `pool.max = 10` would override
* `driverOptions.pool.connectionLimit = 20`).
* When a value is a string surrounded by `${}`, it will be assumed to be a _constant_ property that resides on the `mariadb` module and will be interpolated
* accordingly.
* For example `driverOptions.pool.someProp = '${SOME_MARIADB_CONSTANT}'` will be interpolated as `pool.someProp = mariadb.SOME_MARIADB_CONSTANT`.
*/
/**
* MariaDB + MySQL specific extension of the {@link SQLERConnectionOptions} from the [`sqler`](https://ugate.github.io/sqler/) module.
* @typedef {Object} MDBConnectionOptionsType
* @property {MDBConnectionDriverOptions} driverOptions The `mariadb` (w/MySQL support) module specific options. __Both `connection` and `pool` will be merged
* when generating the connection pool.__
* @typedef {typedefs.SQLERConnectionOptions & MDBConnectionOptionsType} MDBConnectionOptions
*/
/**
* MariaDB + MySQL specific extension of the {@link SQLERExecOptions} from the [`sqler`](https://ugate.github.io/sqler/) module. When a property of `binds`
* contains an object it will be _interpolated_ for property values on the `mariadb` module.
* For example, `binds.name = '${SOME_MARIADB_CONSTANT}'` will be interpolated as
* `binds.name = mariadb.SOME_MARIADB_CONSTANT`.
* @typedef {Object} MDBExecDriverOptions
* @property {String} [preparedStatementDatabase] The database name to use when generating prepared statements for the given execution. Since prepared
* statements are scoped only for a given connection and a temporary stored procedure is used to execute prepared statements, __`preparedStatementDatabase` is
* required when `execOpts.prepareStatement = true`.__
* @property {Object} [exec] The options passed into execution/query functions provided by the `mariadb` module performed during {@link Manager.exec}.
* When a value is a string surrounded by `${}`, it will be assumed to be a _constant_ property that resides on the `mariadb` module and will be interpolated
* accordingly.
* For example `driverOptions.exec.someDriverProp = '${SOME_MARIADB_CONSTANT}'` will be interpolated as
* `driverOptions.exec.someDriverProp = mariadb.SOME_MARIADB_CONSTANT`.
* @property {Boolean} [exec.namedPlaceholders=true] Truthy to use named parameters in MariaDB/MySQL or falsy to convert the named parameters into a
* positional array of bind values.
*/
/**
* MariaDB + MySQL specific extension of the {@link SQLERExecOptions} from the [`sqler`](https://ugate.github.io/sqler/) module. When a property of `binds`
* contains an object it will be _interpolated_ for property values on the `mariadb` module.
* For example, `binds.name = '${SOME_MARIADB_CONSTANT}'` will be interpolated as
* `binds.name = mariadb.SOME_MARIADB_CONSTANT`.
* @typedef {Object} MDBExecOptionsType
* @property {MDBExecDriverOptions} [driverOptions] The `mariadb` module specific options.
* @typedef {typedefs.SQLERExecOptions & MDBExecOptionsType} MDBExecOptions
*/
/**
* Transactions are wrapped in a parent transaction object so private properties can be added (e.g. prepared statements)
* @typedef {Object} MDBTransactionObject
* @property {typedefs.SQLERTransaction} tx The transaction
* @property {DBDriver.Connection} conn The connection
* @property {Map<String, Function>} unprepares Map of prepared statement names (key) and no-argument _async_ functions that will be called as a pre-operation call prior to
* `commit` or `rollback` (value)
* @private
*/
// ========================================== Internal Use ==========================================
/**
* Internal database use
* @typedef {Object} MDBInternal
* @property {MDBDialect} this The dialect instance
* @property {Object} at The internal dialect state
* @property {typedefs.SQLERTrack} at.track The track
* @property {DBDriver} at.driver The dialect driver
* @property {Map<String, MDBTransactionObject>} at.transactions The transactions map
* @property {Map<String, InternalPreparedStatement>} at.stmts The prepared statement map
* @property {MDBExecOptions} at.opts The __global__ execution options
* @property {Object} at.pool The connection pool
* @property {typedefs.SQLERState} at.state The __global__ dialect state
* @property {Function} [at.errorLogger] A function that takes one or more arguments and logs the results as an error (similar to `console.error`)
* @property {Function} [at.logger] A function that takes one or more arguments and logs the results (similar to `console.log`)
* @property {Boolean} [at.debug] A flag that indicates the dialect should be run in debug mode (if supported)
* @private
*/
/**
* Metadata used inpreparation for execution.
* @typedef {Object} InternalExecMeta
* @property {MDBExecDriverOptions} dopts The formatted execution driver options.
* @property {String} sql The formatted/bound execution SQL statement. Will also be set on `dopts.exec.sql` (when present).
* @property {(Object | Array)} [binds] Either an object that contains the bind parameters as property names and property values as the bound values that can be
* bound to an SQL statement or an `Array` of values format to support MySQL/MariaDB use of `?` parameter markers (non-prepared statements).
* @property {Object} [bndp] The object version of `binds`.
* @private
*/
/**
* Executes a prepared statement.
* @callback InternalPreparedStatementExec
* @param {Object} [binds] An object that contains the bind parameters as property names and property values as the bound values (omit to use the original binds
* from {@link typedefs.SQLERExecOptions}).
* @param {Number} [size] The number of returned records limit that the prepared statement should be limited to in the results (will result in warnings when there
* it not a `size` bind parameter on the PS SQL).
* @returns {Object} The prepared statement execution results
* @async
* @private
*/
/**
* Executes a batched prepared statement.
* @callback InternalPreparedStatementBatch
* @param {Object[]} [binds] An array that contains objects that is assigned the bind parameters as property names and property values as the bound values.
* @returns {Object[]} An array of prepared statement execution results
* @async
* @private
*/
/**
* Prepared statement
* @typedef {Object} InternalPreparedStatementType
* @property {String} name The prepared statement name
* @property {String} shortName The short name
* @property {String} procedure The procedure name that the prepared statement will be stored as
* @property {String} sql The originating SQL statement
* @property {InternalExecMeta} execMeta The execution metadata
* @property {String} escapedSQL The driver escaped SQL for the PS
* @property {String[]} bnames The bind property names for the PS
* @property {String} psql The PS SQL that stores the PS
* @property {Promise} [procProm] The current PS promise for storing the PS
* @property {Promise} [prepareExecProm] The promise for the initial PS stored proc to be created/executed
* @property {Promise<DBDriver.Connection>} [connProm] The promise for the PS connection
* @property {DBDriver.Connection} [conn] The PS connection
* @property {InternalPreparedStatementExec} exec The function that executes the prepared statement SQL and returns the results
* @property {InternalPreparedStatementBatch} batch The function that executes a batch of prepared statement SQL and returns the results
* @property {Function} unprepare A no-argument _async_ function that unprepares the outstanding prepared statement
* @typedef {EventEmitter & InternalPreparedStatementType} InternalPreparedStatement
* @private
*/
/**
* @typedef {Object} InternalFlightRecorder
* @property {Error} [error] An errored that occurred
* @property {DBDriver.Connection} [conn] A connection that will be `released` when an error exists
* @private
*/