Examples
The examples below use the following setup:
Private Options Configuration: (appended to the subsequent connection options)
{
"univ": {
"db": {
"mdb": {
"host": "sqler_mdb",
"username":"root",
"password": "my3qllocal"
}
}
}
}Connection Options Configuration:
'use strict';
const fs = require('node:fs');
const path = require('node:path');
module.exports = function buildConf() {
const sslBase = process.env.SQLER_TEST_SSL_DIR || path.join(__dirname, '..', 'ssl');
const useTLS = !process.env.SQLER_TEST_NO_TLS;
const connection = {
multipleStatements: true
};
if (useTLS) {
connection.ssl = {
// using mutual TLS (server/client, CA must be signed by same CA)
ca: fs.readFileSync(path.join(sslBase, 'ca.pem'), 'utf8'),
cert: fs.readFileSync(path.join(sslBase, 'client-cert.pem'), 'utf8'),
key: fs.readFileSync(path.join(sslBase, 'client-key.pem'), 'utf8'),
// rejectUnauthorized should normally be set to true,
// but is set to false for testing/illustration purposes using a self-signed certificate
rejectUnauthorized: false,
minVersion: 'TLSv1.2'
};
} else {
// temporary fallback if you need non-TLS local testing while migrating
connection.allowPublicKeyRetrieval = true;
}
return {
db: {
dialects: {
mdb: 'sqler-mdb'
},
connections: [
{
id: 'mdb',
name: 'mdb',
dir: 'db/mdb',
service: 'MySQL',
dialect: 'mdb',
pool: {},
driverOptions: {
connection,
// prepared statements in MySQL/MariaDB use a temporary
// stored procedure to execute prepared statements...
// in order to do so, the stored procedure needs to have
// a database scope defined where it will reside
// (can also be overridden in the prepared function exec)
preparedStatementDatabase: 'sqlermysql'
}
}
]
}
};
};Test code that illustrates how to use MariaDB/MySQL with various examples
// assuming "conf" contains combined "univ" and "db" objects from above
// create/initialize manager
const manager = new Manager(conf);
await manager.init();
// see subsequent examples for different examples
const result = await runExample(manager, 'mdb');
console.log('Result:', result);
// after we're done using the manager we should close it
process.on('SIGINT', async function sigintDB() {
await manager.close();
console.log('Manager has been closed');
});Create Database
db/mdb/setup/create.database.sql
CREATE DATABASE IF NOT EXISTS sqlermysql'use strict';
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
// create test database
const rslt = await manager.db[connName].setup.create.database();
return rslt;
};Create Table(s)
db/mdb/setup/create.table1.sql
CREATE TABLE IF NOT EXISTS sqlermysql.TEST (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR(512), CREATED_AT DATETIME(3), UPDATED_AT DATETIME(3))db/mdb/setup/create.table2.sql
CREATE TABLE IF NOT EXISTS sqlermysql.TEST2 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR(512), REPORT BLOB, CREATED_AT DATETIME(3), UPDATED_AT DATETIME(3))Create Rows
db/mdb/create.table.rows.sql
CREATE PROCEDURE sqlermysql.perform_test_inserts
(
IN p_id INTEGER, IN p_name VARCHAR(255), IN p_created DATETIME(3), IN p_updated DATETIME(3),
IN p_id2 INTEGER, IN p_name2 VARCHAR(255), IN p_report2 BLOB, IN p_created2 DATETIME(3), IN p_updated2 DATETIME(3)
)
BEGIN
/*
Stored procedure is not required when executing a single SQL statement
Also, MySQL doesn't support anonymous stored procedure blocks
So, a temporary stored procedure is used instead
*/
INSERT INTO sqlermysql.TEST (`ID`, `NAME`, CREATED_AT, UPDATED_AT)
VALUES (p_id, p_name, p_created, p_updated);
INSERT INTO sqlermysql.TEST2 (`ID`, `NAME`, REPORT, CREATED_AT, UPDATED_AT)
VALUES (p_id2, p_name2, p_report2, p_created2, p_updated2);
END;
CALL sqlermysql.perform_test_inserts(
:id, :name, :created, :updated,
:id2, :name2, :report2, :created2, :updated2
);
DROP PROCEDURE sqlermysql.perform_test_inserts;'use strict';
const Fs = require('fs');
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
const date = new Date();
// The driver module currently doesn't support streaming into a column
// (e.g. Fs.createReadStream())
const report = await Fs.promises.readFile('./test/files/audit-report.png');
// Insert rows into multiple tables within a single execution
const rslt = await manager.db[connName].create.table.rows({
binds: {
id: 1, name: 'TABLE: 1, ROW: 1, CREATE: "Initial creation"', created: date, updated: date,
id2: 1, name2: 'TABLE: 2, ROW: 1, CREATE: "Initial creation"', report2: report, created2: date, updated2: date
}
});
return rslt;
};Read Rows
db/mdb/read.table.rows.sql
SELECT TST.ID AS "id", TST.NAME AS "name", NULL AS "report",
TST.CREATED_AT AS "created", TST.UPDATED_AT AS "updated"
FROM sqlermysql.TEST TST
WHERE UPPER(TST.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')
UNION
SELECT TST2.ID AS "id", TST2.NAME AS "name", TST2.REPORT AS "report",
TST2.CREATED_AT AS "created", TST2.UPDATED_AT AS "updated"
FROM sqlermysql.TEST2 TST2
WHERE UPPER(TST2.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')'use strict';
const Os = require('os');
const Fs = require('fs');
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
// stream read from multiple tables
const rslt = await manager.db[connName].read.table.rows({ binds: { name: 'table' } });
// write binary report buffer to file?
const fileWriteProms = [];
for (let row of rslt.rows) {
if (row.report) {
// store the path to the report (illustrative purposes only)
row.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${row.id}.png`;
fileWriteProms.push(Fs.promises.writeFile(row.reportPath, row.report));
}
}
if (fileWriteProms.length) {
await Promise.all(fileWriteProms);
}
return rslt;
};Update Rows
Demo multiple table updated in a single SQL script:
db/mdb/update.table.rows.sql
CREATE PROCEDURE sqlermysql.perform_test_updates
(
IN p_id INTEGER, IN p_name VARCHAR(255), IN p_updated DATETIME(3),
IN p_id2 INTEGER, IN p_name2 VARCHAR(255), IN p_updated2 DATETIME(3)
)
BEGIN
/*
Stored procedure is not required when executing a single SQL statement
Also, MySQL doesn't support anonymous stored procedure blocks
So, a temporary stored procedure is used instead
*/
UPDATE sqlermysql.TEST
SET NAME = p_name, UPDATED_AT = p_updated
WHERE ID = p_id;
UPDATE sqlermysql.TEST2
SET NAME = p_name2, UPDATED_AT = p_updated2
WHERE ID = p_id2;
END;
CALL sqlermysql.perform_test_updates(
:id, :name, :updated,
:id2, :name2, :updated2
);
DROP PROCEDURE sqlermysql.perform_test_updates;Demo prepared statements:
db/mdb/update.table1.rows.sql
UPDATE sqlermysql.TEST
SET NAME = :name, UPDATED_AT = :updated
WHERE ID = :idDemo transactions:
db/mdb/update.table2.rows.sql
UPDATE sqlermysql.TEST2
SET NAME = :name2, UPDATED_AT = :updated2
WHERE ID = :id2'use strict';
const typedefs = require('sqler/typedefs');
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
const date = new Date();
// binds
const table1BindsArray = [
{
id: 1, name: '', updated: date
}
];
const table2BindsArray = [
{
id2: 1, name2: '', updated2: date
}
];
const rtn = {};
//-------------------------------------------------------
// There are two different ways to perform a transaction
// 1. Implicit (suitable for a single execution per tx)
// 2. Explicit (suitable for multiple executions per tx)
// using implicit transactions:
await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using an explicit transaction:
await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using a prepared statement:
await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using a prepared statement within an explicit transaction
await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
return rtn;
};
async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// returned results for both tables
rtn.txImpRslts = new Array(table1BindsArray.length + table2BindsArray.length);
// simple iterator over all the binds
forEach('UPDATE', 'Implicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {
// Example concurrent execution using an implicit transaction for
// each SQL execution (autoCommit = true is the default)
rtn.txImpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
name: binds[nameProp], // execution name is optional
binds
});
});
// could have also ran is series by awaiting when the SQL function is called
for (let i = 0; i < rtn.txImpRslts.length; i++) {
rtn.txImpRslts[i] = await rtn.txImpRslts[i];
}
}
async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// returned results for both tables
rtn.txExpRslts = new Array(table1BindsArray.length + table2BindsArray.length);
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction
tx = await manager.db[connName].beginTransaction();
// simple iterator over all the binds
forEach('UPDATE_TX', 'Explicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {
// Example concurrent execution (same transacion)
rtn.txExpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
name: binds[nameProp], // execution name is optional
binds,
autoCommit: false,
transactionId: tx.id, // ensure execution takes place within transaction
});
});
// could have also ran is series by awaiting when the SQL function is called
for (let i = 0; i < rtn.txExpRslts.length; i++) {
rtn.txExpRslts[i] = await rtn.txExpRslts[i];
}
// commit the transaction
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// rollback the transaction
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
}
async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// need to keep track of at least one result for each table so that unprepare can be called on each
// (could call unprepare using any of the returned execution results for each table)
let psRsltIndexTable1 = 0, psRsltIndexTable2;
// returned results for both tables
rtn.psRslts = new Array(table1BindsArray.length + table2BindsArray.length);
try {
// simple iterator over all the binds
forEach('UPDATE_PS', 'Prepred statement', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {
// Example concurrent execution (same transacion)
rtn.psRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
name: binds[nameProp], // execution name is optional
// flag the SQL execution as a prepared statement
// this will cause the statement to be prepared
// and a dedicated connection to be allocated from
// the pool just before the first SQL executes
prepareStatement: true,
driverOptions: {
// override any driver options here
},
// include the bind parameters
binds
});
// need to keep track of at least one result for each table so that unprepare can be called on each
if (ti && !psRsltIndexTable2) psRsltIndexTable2 = ti;
});
// wait for concurrent executions to complete
for (let i = 0; i < rtn.psRslts.length; i++) {
rtn.psRslts[i] = await rtn.psRslts[i];
}
} finally {
// since prepareStatement = true, we need to close the statement
// and release the prepared statement connection back to the pool
// (also drops the temporary stored procedure that executes the prepared statement)
const proms = [];
if (rtn.psRslts[psRsltIndexTable1]) proms.push(rtn.psRslts[psRsltIndexTable1].unprepare());
if (rtn.psRslts[psRsltIndexTable2]) proms.push(rtn.psRslts[psRsltIndexTable2].unprepare());
if (proms.length) await Promise.all(proms);
}
}
async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// returned results for both tables
rtn.txExpPsRslts = new Array(table1BindsArray.length + table2BindsArray.length);
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction
tx = await manager.db[connName].beginTransaction();
// simple iterator over all the binds
forEach('UPDATE_PS_TX', `Prepred statement with txId ${tx.id}`, table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {
// Example execution in concurrent (same transacion)
rtn.txExpPsRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
name: binds[nameProp], // execution name is optional
autoCommit: false, // don't auto-commit after execution
transactionId: tx.id, // ensure execution takes place within transaction
prepareStatement: true, // ensure a prepared statement is used
driverOptions: {
// override any driver options here
},
// include the bind parameters
binds
});
});
// wait for concurrent executions to complete
for (let i = 0; i < rtn.txExpPsRslts.length; i++) {
rtn.txExpPsRslts[i] = await rtn.txExpPsRslts[i];
}
// unprepare will be called when calling commit
// (alt, could have called unprepare before commit)
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// unprepare will be called when calling rollback
// (alt, could have called unprepare before rollback)
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
}
// just a utility function to iterate over muliple bind arrays and update bind names
function forEach(name, label, table1BindsArray, table2BindsArray, itemHandler) {
const ln = table1BindsArray.length + table2BindsArray.length;
for (let i = 0, ti, ri, barr, nameProp; i < ln; i++) {
// select which table the binds are for
if (i < table1BindsArray.length) {
ti = 0;
ri = i;
barr = table1BindsArray;
} else {
ti = 1;
ri = i - table1BindsArray.length;
barr = table2BindsArray;
}
nameProp = `name${ti ? ti + 1 : ''}`;
// update with expanded name
barr[ri][nameProp] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;
itemHandler(i, ti, ri, barr[ri], nameProp);
}
}Delete Rows
db/mdb/delete.table.rows.sql
CREATE PROCEDURE sqlermysql.perform_test_deletes
(
IN p_id INTEGER, IN p_id2 INTEGER
)
BEGIN
/*
Stored procedure is not required when executing a single SQL statement
Also, MySQL doesn't support anonymous stored procedure blocks
So, a temporary stored procedure is used instead
*/
DELETE FROM sqlermysql.TEST
WHERE ID = :id;
DELETE FROM sqlermysql.TEST2
WHERE ID = :id2;
END;
CALL sqlermysql.perform_test_deletes(
:id, :id2
);
DROP PROCEDURE sqlermysql.perform_test_deletes;'use strict';
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
// Delete rows from multiple tables within a single execution
const rslt = await manager.db[connName].delete.table.rows({
binds: { id: 1, id2: 1 }
});
return rslt;
};Create Rows (streaming)
db/mdb/create.stream.table.rows.sql
[[? createStoredProcedure]]
CREATE PROCEDURE sqlermysql.perform_test_inserts
(
IN p_id INTEGER, IN p_name VARCHAR(255), IN p_created DATETIME(3), IN p_updated DATETIME(3),
IN p_id2 INTEGER, IN p_name2 VARCHAR(255), IN p_report2 BLOB, IN p_created2 DATETIME(3), IN p_updated2 DATETIME(3)
)
BEGIN
/*
Stored procedure is not required when executing a single SQL statement
Also, MySQL doesn't support anonymous stored procedure blocks
So, a temporary stored procedure is used instead
*/
INSERT INTO sqlermysql.TEST (`ID`, `NAME`, CREATED_AT, UPDATED_AT)
VALUES (p_id, p_name, p_created, p_updated);
INSERT INTO sqlermysql.TEST2 (`ID`, `NAME`, REPORT, CREATED_AT, UPDATED_AT)
VALUES (p_id2, p_name2, p_report2, p_created2, p_updated2);
END;
[[?]]
[[? callStoredProcedure]]
CALL sqlermysql.perform_test_inserts(
:id, :name, :created, :updated,
:id2, :name2, :report2, :created2, :updated2
);
[[?]]
[[? dropStoredProcedure]]
DROP PROCEDURE sqlermysql.perform_test_inserts;
[[?]]'use strict';
const Fs = require('fs');
const Stream = require('stream');
const { pipeline } = require('stream/promises');
/**
* Example using streams
* export just to illustrate module usage
* @param {Manager} manager sqler manager
* @param {String} connName The connection name to use
* @returns {Promise<typedefs.SQLERExecResults>}
*/
module.exports = async function runExample(manager, connName) {
const date = new Date();
// The driver module currently doesn't support streaming into a column
// (e.g. Fs.createReadStream())
const report = await Fs.promises.readFile('./test/files/audit-report.png');
/** @type {typedefs.SQLERTransaction} */
let tx;
/** @type {typedefs.SQLERExecResults} */
let rslts;
try {
tx = await manager.db[connName].beginTransaction();
// We'll need to split create, call (stream), drop stored procedure into multiple
// executions since we cannot stream the results with create/drop in the execution.
// We can do this using sqler fragments defined in the SQL file
await manager.db[connName].create.stream.table.rows({
autoCommit: false,
transactionId: tx.id,
}, ['createStoredProcedure']);
// Insert rows into multiple tables within a single execution
rslts = await manager.db[connName].create.stream.table.rows({
// create binds will be batched in groups of 2 before streaming them to the database since
// execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
// https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
stream: 2,
// no need to set execOpts.binds since they will be streamed from the create instead
autoCommit: false,
transactionId: tx.id,
}, ['callStoredProcedure']);
for (let writeStream of rslts.rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from([
{
id: 100, name: 'TABLE: 1, ROW: 1, CREATE_STREAM: "Initial creation"', created: date, updated: date,
id2: 100, name2: 'TABLE: 2, ROW: 1, CREATE_STREAM: "Initial creation"', report2: report, created2: date, updated2: date
},
{
id: 200, name: 'TABLE: 1, ROW: 2, CREATE_STREAM: "Initial creation"', created: date, updated: date,
id2: 200, name2: 'TABLE: 2, ROW: 2, CREATE_STREAM: "Initial creation"', report2: report, created2: date, updated2: date
}
]),
writeStream
)
}
// drop the stored procedure (done here for brevity, in reality we'd want to ensure it's dropped if creation was successful)
await manager.db[connName].create.stream.table.rows({
autoCommit: false,
transactionId: tx.id
}, ['dropStoredProcedure']);
await tx.commit(true);
} catch (err) {
if (tx) await tx.rollback(true);
throw err;
}
return rslts;
};
/**
* @import { Manager, typedefs } from 'sqler'
*/Read Rows (streaming)
db/mdb/read.table.rows.sql
SELECT TST.ID AS "id", TST.NAME AS "name", NULL AS "report",
TST.CREATED_AT AS "created", TST.UPDATED_AT AS "updated"
FROM sqlermysql.TEST TST
WHERE UPPER(TST.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')
UNION
SELECT TST2.ID AS "id", TST2.NAME AS "name", TST2.REPORT AS "report",
TST2.CREATED_AT AS "created", TST2.UPDATED_AT AS "updated"
FROM sqlermysql.TEST2 TST2
WHERE UPPER(TST2.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')'use strict';
const Os = require('os');
const Fs = require('fs');
const Stream = require('stream');
const { pipeline } = require('stream/promises');
/**
* Example using streams
* export just to illustrate module usage
* @param {Manager} manager sqler manager
* @param {String} connName The connection name to use
* @returns {Promise<typedefs.SQLERExecResults>}
*/
module.exports = async function runExample(manager, connName) {
// read from multiple tables
const rslt = await manager.db[connName].read.table.rows({
stream: 1, // indicate reads will be streamed
binds: { name: 'stream' }
});
// stream all reads to a central JSON file (illustrative purposes only)
rslt.jsonFile = `${Os.tmpdir()}/sqler-${connName}-read-stream-all.json`;
// write binary report buffer to file?
const fileWriteProms = [];
for (let readStream of rslt.rows) {
// read stream is MDB implementation:
// https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionquerystreamsql-values-emitter
await pipeline(
readStream,
new Stream.Transform({
objectMode: true,
transform: function transformer(chunk, encoding, callback) {
if (Buffer.isBuffer(chunk.report)) {
// transform and store the path to the report (illustrative purposes only)
chunk.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${chunk.id}.png`;
fileWriteProms.push(Fs.promises.writeFile(chunk.reportPath, chunk.report));
// don't include the report Buffer in the JSON since there should be a file
delete chunk.report;
}
callback(null, chunk);
}
}),
// add a transform that formats the JSON into an array string suitable for file write
async function* transformStringify(chunksAsync) {
yield '[';
let cnt = -1;
for await (const chunk of chunksAsync) {
cnt++;
yield `${cnt ? ',' : ''}${JSON.stringify(chunk)}`;
}
yield ']';
},
Fs.createWriteStream(rslt.jsonFile)
);
}
if (fileWriteProms.length) {
await Promise.all(fileWriteProms);
}
return rslt;
};
/**
* @import { Manager, typedefs } from 'sqler'
*/Update Rows (streaming)
db/mdb/update.table1.rows.sql
UPDATE sqlermysql.TEST
SET NAME = :name, UPDATED_AT = :updated
WHERE ID = :iddb/mdb/update.table2.rows.sql
UPDATE sqlermysql.TEST2
SET NAME = :name2, UPDATED_AT = :updated2
WHERE ID = :id2'use strict';
const Stream = require('stream');
const { pipeline } = require('stream/promises');
/**
* Example using streams
* export just to illustrate module usage
* @param {Manager} manager sqler manager
* @param {String} connName The connection name to use
* @returns {Promise<typedefs.SQLERExecResults>}
*/
module.exports = async function runExample(manager, connName) {
const date = new Date();
// binds
const table1BindsArray = [
{
id: 100, name: '', updated: date
},
{
id: 200, name: '', updated: date
},
];
const table2BindsArray = [
{
id2: 100, name2: '', updated2: date
},
{
id2: 200, name2: '', updated2: date
}
];
/** @type {typedefs.SQLERExecResults} */
const rtn = {};
//-------------------------------------------------------
// There are two different ways to perform a transaction
// 1. Implicit (suitable for a single execution per tx)
// 2. Explicit (suitable for multiple executions per tx)
// using implicit transactions:
await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using an explicit transaction:
await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using a prepared statement:
await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using a prepared statement within an explicit transaction
await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
return rtn;
};
/**
* implicitTransactionUpdate
* @param {Manager} manager sqler manager
* @param {String} connName The connection name to use
* @param {typedefs.SQLERExecResults} rtn Returned results
* @param {Object[]} table1BindsArray 1st table binds
* @param {Object[]} table2BindsArray 2nd table binds
*/
async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// simply rename all the bind names to reflect the action being performed
nameAll('UPDATE_STREAM', 'Implicit transaction', table1BindsArray, table2BindsArray);
// loop through and perform the updates via the writable stream
let ni = 0;
const bindsArrays = [ table1BindsArray, table2BindsArray ];
rtn.txImpRslts = new Array(bindsArrays.length);
for (let bindsArray of bindsArrays) {
// Example using an implicit transaction for each streamed (autoCommit = true is the default)
rtn.txImpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
// update binds will be batched in groups of 1 before streaming them to the database since
// execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
// https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
stream: 1
// no need to set execOpts.binds since they will be streamed from the update instead
});
// now that the write streams are ready and the read binds have been renamed,
// we can cycle through the bind arrays and write them to the appropriate tables
for (let writeStream of rtn.txImpRslts[ni].rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from(bindsArray),
writeStream
);
}
ni++;
}
}
/**
* explicitTransactionUpdate
* @param {Manager} manager sqler manager
* @param {String} connName The connection name to use
* @param {typedefs.SQLERExecResults} rtn Returned results
* @param {Object[]} table1BindsArray 1st table binds
* @param {Object[]} table2BindsArray 2nd table binds
*/
async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// start a transaction
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
tx = await manager.db[connName].beginTransaction();
// simply rename all the bind names to reflect the action being performed
nameAll('UPDATE_STREAM_TX', 'Explicit transaction', table1BindsArray, table2BindsArray);
// loop through and perform the updates via the writable stream
let ni = 0;
const bindsArrays = [ table1BindsArray, table2BindsArray ];
rtn.txExpRslts = new Array(bindsArrays.length);
for (let bindsArray of bindsArrays) {
// Example using an implicit transaction for each streamed (autoCommit = true is the default)
rtn.txExpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
autoCommit: false, // don't auto-commit after execution
transactionId: tx.id, // ensure execution takes place within transaction
// update binds will be batched in groups of 1 before streaming them to the database since
// execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
// https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
stream: 1
// no need to set execOpts.binds since they will be streamed from the update instead
});
// now that the write streams are ready and the read binds have been renamed,
// we can cycle through the bind arrays and write them to the appropriate tables
for (let writeStream of rtn.txExpRslts[ni].rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from(bindsArray),
writeStream
);
}
ni++;
}
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
}
/**
* preparedStatementUpdate
* @param {Manager} manager sqler manager
* @param {String} connName The connection name to use
* @param {typedefs.SQLERExecResults} rtn Returned results
* @param {Object[]} table1BindsArray 1st table binds
* @param {Object[]} table2BindsArray 2nd table binds
*/
async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// need to keep track of at least one result for each table so that unprepare can be called on each
// (could call unprepare using any of the returned stream results for each table)
let psRsltTable1, psRsltTable2;
try {
// simply rename all the bind names to reflect the action being performed
nameAll('UPDATE_STREAM_PS', 'Prepared statement', table1BindsArray, table2BindsArray);
// loop through and perform the updates via the writable stream
let ni = 0;
const bindsArrays = [ table1BindsArray, table2BindsArray ];
rtn.psRslts = new Array(bindsArrays.length);
for (let bindsArray of bindsArrays) {
// Example using an implicit transaction for each streamed (autoCommit = true is the default)
rtn.psRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
// flag the SQL execution as a prepared statement
// this will cause the statement to be prepared
// and a dedicated connection to be allocated from
// the pool just before the first SQL executes
prepareStatement: true,
driverOptions: {
// override any driver options here
},
// update binds will be batched in groups of 1 before streaming them to the database since
// execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
// https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
stream: 1
// no need to set execOpts.binds since they will be streamed from the update instead
});
// need to keep track of at least one result for each table so that unprepare can be called on each
if (ni === 0 && !psRsltTable1) {
psRsltTable1 = rtn.psRslts[ni];
} else if (ni && !psRsltTable2) {
psRsltTable2 = rtn.psRslts[ni];
}
// now that the write streams are ready and the read binds have been renamed,
// we can cycle through the bind arrays and write them to the appropriate tables
for (let writeStream of rtn.psRslts[ni].rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from(bindsArray),
writeStream
);
}
ni++;
}
} finally {
// since prepareStatement = true, we need to close the statement
// and release the prepared statement connection back to the pool
// (also drops the temporary stored procedure that executes the prepared statement)
const proms = [];
if (psRsltTable1) proms.push(psRsltTable1.unprepare());
if (psRsltTable2) proms.push(psRsltTable2.unprepare());
if (proms.length) await Promise.all(proms);
}
}
/**
* preparedStatementExplicitTxUpdate
* @param {Manager} manager sqler manager
* @param {String} connName The connection name to use
* @param {typedefs.SQLERExecResults} rtn Returned results
* @param {Object[]} table1BindsArray 1st table binds
* @param {Object[]} table2BindsArray 2nd table binds
*/
async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction
tx = await manager.db[connName].beginTransaction();
// simply rename all the bind names to reflect the action being performed
nameAll('UPDATE_STREAM_PS_TX', `Prepared statement with txId ${tx.id}`, table1BindsArray, table2BindsArray);
// loop through and perform the updates via the writable stream
let ni = 0;
const bindsArrays = [ table1BindsArray, table2BindsArray ];
rtn.txExpPsRslts = new Array(bindsArrays.length);
for (let bindsArray of bindsArrays) {
// Example using an implicit transaction for each streamed (autoCommit = true is the default)
rtn.txExpPsRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
autoCommit: false, // don't auto-commit after execution
transactionId: tx.id, // ensure execution takes place within transaction
prepareStatement: true, // ensure a prepared statement is used
driverOptions: {
// override any driver options here
},
// update binds will be batched in groups of 1 before streaming them to the database since
// execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
// https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
stream: 1
// no need to set execOpts.binds since they will be streamed from the update instead
});
// now that the write streams are ready and the read binds have been renamed,
// we can cycle through the bind arrays and write them to the appropriate tables
for (let writeStream of rtn.txExpPsRslts[ni].rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from(bindsArray),
writeStream
);
}
ni++;
}
// unprepare will be called on all prepared statements associated with the transaction when calling
// commit (alt, could have called unprepare before commit)
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// unprepare will be called on all prepared statements associated with the transaction when calling
// rollback (alt, could have called unprepare before rollback)
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
}
/**
* Just a utility function to iterate over muliple bind name arrays to rename them
* @param {String} name Name of the bind that will be renamed
* @param {String} label Label to be used on the value
* @param {Object[]} table1BindsArray 1st table binds
* @param {Object[]} table2BindsArray 2nd table binds
* @returns {String[]} Names
*/
function nameAll(name, label, table1BindsArray, table2BindsArray) {
const ln = table1BindsArray.length + (table2BindsArray ? table2BindsArray.length : 0);
for (let i = 0, ti, ri, barr; i < ln; i++) {
// select which table the binds are for
if (i < table1BindsArray.length) {
ti = 0;
ri = i;
barr = table1BindsArray;
} else {
ti = 1;
ri = i - table1BindsArray.length;
barr = table2BindsArray;
}
// update with expanded name
barr[ri][`name${ti ? ti + 1 : ''}`] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;
}
return [ 'name', 'name2' ];
}
/**
* @import { Manager, typedefs } from 'sqler'
*/Delete Rows (streaming)
db/mdb/delete.stream.table.rows.sql
[[? createStoredProcedure]]
CREATE PROCEDURE sqlermysql.perform_test_deletes
(
IN p_id INTEGER, IN p_id2 INTEGER
)
BEGIN
/*
Stored procedure is not required when executing a single SQL statement
Also, MySQL doesn't support anonymous stored procedure blocks
So, a temporary stored procedure is used instead
*/
DELETE FROM sqlermysql.TEST
WHERE ID = :id;
DELETE FROM sqlermysql.TEST2
WHERE ID = :id2;
END;
[[?]]
[[? callStoredProcedure]]
CALL sqlermysql.perform_test_deletes(
:id, :id2
);
[[?]]
[[? dropStoredProcedure]]
DROP PROCEDURE sqlermysql.perform_test_deletes;
[[?]]'use strict';
const Stream = require('stream');
const { pipeline } = require('stream/promises');
/**
* Example using streams
* export just to illustrate module usage
* @param {Manager} manager sqler manager
* @param {String} connName The connection name to use
* @returns {Promise<typedefs.SQLERExecResults>}
*/
module.exports = async function runExample(manager, connName) {
/** @type {typedefs.SQLERTransaction} */
let tx;
/** @type {typedefs.SQLERExecResults} */
let rslts;
try {
tx = await manager.db[connName].beginTransaction();
// We'll need to split create, call (stream), drop stored procedure into multiple
// executions since we cannot stream the results with create/drop in the execution.
// We can do this using sqler fragments defined in the SQL file
await manager.db[connName].delete.stream.table.rows({
autoCommit: false,
transactionId: tx.id
}, ['createStoredProcedure']);
// Delete rows into multiple tables within a single execution
rslts = await manager.db[connName].delete.stream.table.rows({
// delete binds will be batched in groups of 2 before streaming them to the database since
// execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
// https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
stream: 2,
// no need to set execOpts.binds since they will be streamed from the create instead
autoCommit: false,
transactionId: tx.id,
}, ['callStoredProcedure']);
for (let writeStream of rslts.rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from([
{
id: 100, id2: 100
},
{
id: 200, id2: 200
}
]),
writeStream
)
}
// drop the stored procedure (done here for brevity, in reality we'd want to ensure it's dropped if creation was successful)
await manager.db[connName].delete.stream.table.rows({
autoCommit: false,
transactionId: tx.id
}, ['dropStoredProcedure']);
await tx.commit(true);
} catch (err) {
if (tx) await tx.rollback(true);
throw err;
}
return rslts;
};
/**
* @import { Manager, typedefs } from 'sqler'
*/Delete Database
db/mdb/setup/delete.database.sql
DROP DATABASE sqlermysql'use strict';
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
// delete the test database
const rslt = await manager.db[connName].setup.delete.database();
return rslt;
};