Skip to content

Examples

The examples below use the following setup:

Private Options Configuration: (appended to the subsequent connection options)

json
{
  "univ": {
    "db": {
      "mdb": {
        "host": "sqler_mdb",
        "username":"root",
        "password": "my3qllocal"
      }
    }
  }
}

Connection Options Configuration:

js
'use strict';

const fs = require('node:fs');
const path = require('node:path');

module.exports = function buildConf() {
  const sslBase = process.env.SQLER_TEST_SSL_DIR || path.join(__dirname, '..', 'ssl');

  const useTLS = !process.env.SQLER_TEST_NO_TLS;

  const connection = {
    multipleStatements: true
  };

  if (useTLS) {
    connection.ssl = {
      // using mutual TLS (server/client, CA must be signed by same CA)
      ca: fs.readFileSync(path.join(sslBase, 'ca.pem'), 'utf8'),
      cert: fs.readFileSync(path.join(sslBase, 'client-cert.pem'), 'utf8'),
      key: fs.readFileSync(path.join(sslBase, 'client-key.pem'), 'utf8'),
      // rejectUnauthorized should normally be set to true,
      // but is set to false for testing/illustration purposes using a self-signed certificate 
      rejectUnauthorized: false,
      minVersion: 'TLSv1.2'
    };
  } else {
    // temporary fallback if you need non-TLS local testing while migrating
    connection.allowPublicKeyRetrieval = true;
  }

  return {
    db: {
      dialects: {
        mdb: 'sqler-mdb'
      },
      connections: [
        {
          id: 'mdb',
          name: 'mdb',
          dir: 'db/mdb',
          service: 'MySQL',
          dialect: 'mdb',
          pool: {},
          driverOptions: {
            connection,
            // prepared statements in MySQL/MariaDB use a temporary
            // stored procedure to execute prepared statements...
            // in order to do so, the stored procedure needs to have
            // a database scope defined where it will reside
            // (can also be overridden in the prepared function exec)
            preparedStatementDatabase: 'sqlermysql'
          }
        }
      ]
    }
  };
};

Test code that illustrates how to use MariaDB/MySQL with various examples

js
// assuming "conf" contains combined "univ" and "db" objects from above

// create/initialize manager
const manager = new Manager(conf);
await manager.init();

// see subsequent examples for different examples
const result = await runExample(manager, 'mdb');

console.log('Result:', result);

// after we're done using the manager we should close it
process.on('SIGINT', async function sigintDB() {
  await manager.close();
  console.log('Manager has been closed');
});

Create Database

db/mdb/setup/create.database.sql

sql
CREATE DATABASE IF NOT EXISTS sqlermysql
js
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // create test database
  const rslt = await manager.db[connName].setup.create.database();

  return rslt;
};

Create Table(s)

db/mdb/setup/create.table1.sql

sql
CREATE TABLE IF NOT EXISTS sqlermysql.TEST (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR(512), CREATED_AT DATETIME(3), UPDATED_AT DATETIME(3))

db/mdb/setup/create.table2.sql

sql
CREATE TABLE IF NOT EXISTS sqlermysql.TEST2 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR(512), REPORT BLOB, CREATED_AT DATETIME(3), UPDATED_AT DATETIME(3))

Create Rows

db/mdb/create.table.rows.sql

sql
CREATE PROCEDURE sqlermysql.perform_test_inserts
(
  IN p_id INTEGER, IN p_name VARCHAR(255), IN p_created DATETIME(3), IN p_updated DATETIME(3),
  IN p_id2 INTEGER, IN p_name2 VARCHAR(255), IN p_report2 BLOB, IN p_created2 DATETIME(3), IN p_updated2 DATETIME(3)
)
BEGIN
  /*
  Stored procedure is not required when executing a single SQL statement
  Also, MySQL doesn't support anonymous stored procedure blocks
  So, a temporary stored procedure is used instead
  */
  INSERT INTO sqlermysql.TEST (`ID`, `NAME`, CREATED_AT, UPDATED_AT)
  VALUES (p_id, p_name, p_created, p_updated);
  INSERT INTO sqlermysql.TEST2 (`ID`, `NAME`, REPORT, CREATED_AT, UPDATED_AT)
  VALUES (p_id2, p_name2, p_report2, p_created2, p_updated2);
END;
CALL sqlermysql.perform_test_inserts(
  :id, :name, :created, :updated,
  :id2, :name2, :report2, :created2, :updated2
);
DROP PROCEDURE sqlermysql.perform_test_inserts;
js
'use strict';

const Fs = require('fs');

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // The driver module currently doesn't support streaming into a column
  // (e.g. Fs.createReadStream())
  const report = await Fs.promises.readFile('./test/files/audit-report.png');

  // Insert rows into multiple tables within a single execution
  const rslt = await manager.db[connName].create.table.rows({
    binds: {
      id: 1, name: 'TABLE: 1, ROW: 1, CREATE: "Initial creation"', created: date, updated: date,
      id2: 1, name2: 'TABLE: 2, ROW: 1, CREATE: "Initial creation"', report2: report, created2: date, updated2: date
    }
  });

  return rslt;
};

Read Rows

db/mdb/read.table.rows.sql

sql
SELECT TST.ID AS "id", TST.NAME AS "name", NULL AS "report",
TST.CREATED_AT AS "created", TST.UPDATED_AT AS "updated"
FROM sqlermysql.TEST TST
WHERE UPPER(TST.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%') 
UNION
SELECT TST2.ID AS "id", TST2.NAME AS "name", TST2.REPORT AS "report",
TST2.CREATED_AT AS "created", TST2.UPDATED_AT AS "updated"
FROM sqlermysql.TEST2 TST2
WHERE UPPER(TST2.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')
js
'use strict';

const Os = require('os');
const Fs = require('fs');

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // stream read from multiple tables
  const rslt = await manager.db[connName].read.table.rows({ binds: { name: 'table' } });

  // write binary report buffer to file?
  const fileWriteProms = [];
  for (let row of rslt.rows) {
    if (row.report) {
      // store the path to the report (illustrative purposes only)
      row.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${row.id}.png`;
      fileWriteProms.push(Fs.promises.writeFile(row.reportPath, row.report));
    }
  }
  if (fileWriteProms.length) {
    await Promise.all(fileWriteProms);
  }

  return rslt;
};

Update Rows

Demo multiple table updated in a single SQL script:

db/mdb/update.table.rows.sql

sql
CREATE PROCEDURE sqlermysql.perform_test_updates
(
  IN p_id INTEGER, IN p_name VARCHAR(255), IN p_updated DATETIME(3),
  IN p_id2 INTEGER, IN p_name2 VARCHAR(255), IN p_updated2 DATETIME(3)
)
BEGIN
  /*
  Stored procedure is not required when executing a single SQL statement
  Also, MySQL doesn't support anonymous stored procedure blocks
  So, a temporary stored procedure is used instead
  */
  UPDATE sqlermysql.TEST
  SET NAME = p_name, UPDATED_AT = p_updated
  WHERE ID = p_id;
  UPDATE sqlermysql.TEST2
  SET NAME = p_name2, UPDATED_AT = p_updated2
  WHERE ID = p_id2;
END;
CALL sqlermysql.perform_test_updates(
  :id, :name, :updated,
  :id2, :name2, :updated2
);
DROP PROCEDURE sqlermysql.perform_test_updates;

Demo prepared statements:

db/mdb/update.table1.rows.sql

sql
  UPDATE sqlermysql.TEST
  SET NAME = :name, UPDATED_AT = :updated
  WHERE ID = :id

Demo transactions:

db/mdb/update.table2.rows.sql

sql
  UPDATE sqlermysql.TEST2
  SET NAME = :name2, UPDATED_AT = :updated2
  WHERE ID = :id2
js
'use strict';

const typedefs = require('sqler/typedefs');

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // binds
  const table1BindsArray = [
    {
      id: 1, name: '', updated: date
    }
  ];
  const table2BindsArray = [
    {
      id2: 1, name2: '', updated2: date
    }
  ];
  const rtn = {};

  //-------------------------------------------------------
  // There are two different ways to perform a transaction
  // 1. Implicit (suitable for a single execution per tx)
  // 2. Explicit (suitable for multiple executions per tx)

  // using implicit transactions:
  await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using an explicit transaction:
  await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement:
  await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement within an explicit transaction
  await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  return rtn;
};

async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // returned results for both tables
  rtn.txImpRslts = new Array(table1BindsArray.length + table2BindsArray.length);

  // simple iterator over all the binds
  forEach('UPDATE', 'Implicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

    // Example concurrent execution using an implicit transaction for
    // each SQL execution (autoCommit = true is the default)
    rtn.txImpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
      name: binds[nameProp], // execution name is optional
      binds
    });

  });

  // could have also ran is series by awaiting when the SQL function is called
  for (let i = 0; i < rtn.txImpRslts.length; i++) {
    rtn.txImpRslts[i] = await rtn.txImpRslts[i];
  }
}

async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // returned results for both tables
  rtn.txExpRslts = new Array(table1BindsArray.length + table2BindsArray.length);
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // simple iterator over all the binds
    forEach('UPDATE_TX', 'Explicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

      // Example concurrent execution (same transacion)
      rtn.txExpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
        name: binds[nameProp], // execution name is optional
        binds,
        autoCommit: false,
        transactionId: tx.id, // ensure execution takes place within transaction
      });

    });
  
    // could have also ran is series by awaiting when the SQL function is called
    for (let i = 0; i < rtn.txExpRslts.length; i++) {
      rtn.txExpRslts[i] = await rtn.txExpRslts[i];
    }

    // commit the transaction
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // rollback the transaction
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // need to keep track of at least one result for each table so that unprepare can be called on each
  // (could call unprepare using any of the returned execution results for each table)
  let psRsltIndexTable1 = 0, psRsltIndexTable2;
  // returned results for both tables
  rtn.psRslts = new Array(table1BindsArray.length + table2BindsArray.length);
  try {

    // simple iterator over all the binds
    forEach('UPDATE_PS', 'Prepred statement', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

      // Example concurrent execution (same transacion)
      rtn.psRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
        name: binds[nameProp], // execution name is optional
        // flag the SQL execution as a prepared statement
        // this will cause the statement to be prepared
        // and a dedicated connection to be allocated from
        // the pool just before the first SQL executes
        prepareStatement: true,
        driverOptions: {
          // override any driver options here
        },
        // include the bind parameters
        binds
      });

      // need to keep track of at least one result for each table so that unprepare can be called on each
      if (ti && !psRsltIndexTable2) psRsltIndexTable2 = ti;

    });

    // wait for concurrent executions to complete
    for (let i = 0; i < rtn.psRslts.length; i++) {
      rtn.psRslts[i] = await rtn.psRslts[i];
    }
  } finally {
    // since prepareStatement = true, we need to close the statement
    // and release the prepared statement connection back to the pool
    // (also drops the temporary stored procedure that executes the prepared statement)
    const proms = [];
    if (rtn.psRslts[psRsltIndexTable1]) proms.push(rtn.psRslts[psRsltIndexTable1].unprepare());
    if (rtn.psRslts[psRsltIndexTable2]) proms.push(rtn.psRslts[psRsltIndexTable2].unprepare());
    if (proms.length) await Promise.all(proms);
  }
}

async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // returned results for both tables
  rtn.txExpPsRslts = new Array(table1BindsArray.length + table2BindsArray.length);
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // simple iterator over all the binds
    forEach('UPDATE_PS_TX', `Prepred statement with txId ${tx.id}`, table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

      // Example execution in concurrent (same transacion)
      rtn.txExpPsRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
        name: binds[nameProp], // execution name is optional
        autoCommit: false, // don't auto-commit after execution
        transactionId: tx.id, // ensure execution takes place within transaction
        prepareStatement: true, // ensure a prepared statement is used
        driverOptions: {
          // override any driver options here
        },
        // include the bind parameters
        binds
      });

    });

    // wait for concurrent executions to complete
    for (let i = 0; i < rtn.txExpPsRslts.length; i++) {
      rtn.txExpPsRslts[i] = await rtn.txExpPsRslts[i];
    }

    // unprepare will be called when calling commit
    // (alt, could have called unprepare before commit)
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // unprepare will be called when calling rollback
      // (alt, could have called unprepare before rollback)
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

// just a utility function to iterate over muliple bind arrays and update bind names
function forEach(name, label, table1BindsArray, table2BindsArray, itemHandler) {
  const ln = table1BindsArray.length + table2BindsArray.length;
  for (let i = 0, ti, ri, barr, nameProp; i < ln; i++) {
    // select which table the binds are for
    if (i < table1BindsArray.length) {
      ti = 0;
      ri = i;
      barr = table1BindsArray;
    } else {
      ti = 1;
      ri = i - table1BindsArray.length;
      barr = table2BindsArray;
    }
    nameProp = `name${ti ? ti + 1 : ''}`;

    // update with expanded name
    barr[ri][nameProp] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;

    itemHandler(i, ti, ri, barr[ri], nameProp);
  }
}

Delete Rows

db/mdb/delete.table.rows.sql

sql
CREATE PROCEDURE sqlermysql.perform_test_deletes
(
  IN p_id INTEGER, IN p_id2 INTEGER
)
BEGIN
  /*
  Stored procedure is not required when executing a single SQL statement
  Also, MySQL doesn't support anonymous stored procedure blocks
  So, a temporary stored procedure is used instead
  */
  DELETE FROM sqlermysql.TEST
  WHERE ID = :id;
  DELETE FROM sqlermysql.TEST2
  WHERE ID = :id2;
END;
CALL sqlermysql.perform_test_deletes(
  :id, :id2
);
DROP PROCEDURE sqlermysql.perform_test_deletes;
js
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // Delete rows from multiple tables within a single execution
  const rslt = await manager.db[connName].delete.table.rows({
    binds: { id: 1, id2: 1 }
  });

  return rslt;
};

Create Rows (streaming)

db/mdb/create.stream.table.rows.sql

sql
[[? createStoredProcedure]]
CREATE PROCEDURE sqlermysql.perform_test_inserts
(
  IN p_id INTEGER, IN p_name VARCHAR(255), IN p_created DATETIME(3), IN p_updated DATETIME(3),
  IN p_id2 INTEGER, IN p_name2 VARCHAR(255), IN p_report2 BLOB, IN p_created2 DATETIME(3), IN p_updated2 DATETIME(3)
)
BEGIN
  /*
  Stored procedure is not required when executing a single SQL statement
  Also, MySQL doesn't support anonymous stored procedure blocks
  So, a temporary stored procedure is used instead
  */
  INSERT INTO sqlermysql.TEST (`ID`, `NAME`, CREATED_AT, UPDATED_AT)
  VALUES (p_id, p_name, p_created, p_updated);
  INSERT INTO sqlermysql.TEST2 (`ID`, `NAME`, REPORT, CREATED_AT, UPDATED_AT)
  VALUES (p_id2, p_name2, p_report2, p_created2, p_updated2);
END;
[[?]]
[[? callStoredProcedure]]
CALL sqlermysql.perform_test_inserts(
  :id, :name, :created, :updated,
  :id2, :name2, :report2, :created2, :updated2
);
[[?]]
[[? dropStoredProcedure]]
DROP PROCEDURE sqlermysql.perform_test_inserts;
[[?]]
js
'use strict';

const Fs = require('fs');
const Stream = require('stream');
const { pipeline } = require('stream/promises');

/**
 * Example using streams
 * export just to illustrate module usage
 * @param {Manager} manager sqler manager
 * @param {String} connName The connection name to use 
 * @returns {Promise<typedefs.SQLERExecResults>}
 */
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // The driver module currently doesn't support streaming into a column
  // (e.g. Fs.createReadStream())
  const report = await Fs.promises.readFile('./test/files/audit-report.png');

  /** @type {typedefs.SQLERTransaction} */
  let tx;
  /** @type {typedefs.SQLERExecResults} */
  let rslts;
  try {
    tx = await manager.db[connName].beginTransaction();

    // We'll need to split create, call (stream), drop stored procedure into multiple
    // executions since we cannot stream the results with create/drop in the execution.
    // We can do this using sqler fragments defined in the SQL file
    await manager.db[connName].create.stream.table.rows({
      autoCommit: false,
      transactionId: tx.id,
    }, ['createStoredProcedure']);

    // Insert rows into multiple tables within a single execution
    rslts = await manager.db[connName].create.stream.table.rows({
      // create binds will be batched in groups of 2 before streaming them to the database since
      // execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
      // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
      stream: 2,
      // no need to set execOpts.binds since they will be streamed from the create instead
      autoCommit: false,
      transactionId: tx.id,
    }, ['callStoredProcedure']);

    for (let writeStream of rslts.rows) {
      await pipeline(
        // here we're just using some static values for illustration purposes, but they can come from a
        // any readable stream source like a file, database, etc. as long as they are "transformed"
        // into JSON binds before the sqler writable stream receives them
        Stream.Readable.from([
          {
            id: 100, name: 'TABLE: 1, ROW: 1, CREATE_STREAM: "Initial creation"', created: date, updated: date,
            id2: 100, name2: 'TABLE: 2, ROW: 1, CREATE_STREAM: "Initial creation"', report2: report, created2: date, updated2: date
          },
          {
            id: 200, name: 'TABLE: 1, ROW: 2, CREATE_STREAM: "Initial creation"', created: date, updated: date,
            id2: 200, name2: 'TABLE: 2, ROW: 2, CREATE_STREAM: "Initial creation"', report2: report, created2: date, updated2: date
          }
        ]),
        writeStream
      )
    }

    // drop the stored procedure (done here for brevity, in reality we'd want to ensure it's dropped if creation was successful)
    await manager.db[connName].create.stream.table.rows({
      autoCommit: false,
      transactionId: tx.id
    }, ['dropStoredProcedure']);

    await tx.commit(true);
  } catch (err) {
    if (tx) await tx.rollback(true);
    throw err;
  }

  return rslts;
};

/**
 * @import { Manager, typedefs } from 'sqler'
 */

Read Rows (streaming)

db/mdb/read.table.rows.sql

sql
SELECT TST.ID AS "id", TST.NAME AS "name", NULL AS "report",
TST.CREATED_AT AS "created", TST.UPDATED_AT AS "updated"
FROM sqlermysql.TEST TST
WHERE UPPER(TST.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%') 
UNION
SELECT TST2.ID AS "id", TST2.NAME AS "name", TST2.REPORT AS "report",
TST2.CREATED_AT AS "created", TST2.UPDATED_AT AS "updated"
FROM sqlermysql.TEST2 TST2
WHERE UPPER(TST2.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')
js
'use strict';

const Os = require('os');
const Fs = require('fs');
const Stream = require('stream');
const { pipeline } = require('stream/promises');

/**
 * Example using streams
 * export just to illustrate module usage
 * @param {Manager} manager sqler manager
 * @param {String} connName The connection name to use 
 * @returns {Promise<typedefs.SQLERExecResults>}
 */
module.exports = async function runExample(manager, connName) {

  // read from multiple tables
  const rslt = await manager.db[connName].read.table.rows({
    stream: 1, // indicate reads will be streamed
    binds: { name: 'stream' }
  });

  // stream all reads to a central JSON file (illustrative purposes only)
  rslt.jsonFile = `${Os.tmpdir()}/sqler-${connName}-read-stream-all.json`;

  // write binary report buffer to file?
  const fileWriteProms = [];
  for (let readStream of rslt.rows) {
    // read stream is MDB implementation:
    // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionquerystreamsql-values-emitter
    await pipeline(
      readStream,
      new Stream.Transform({
        objectMode: true,
        transform: function transformer(chunk, encoding, callback) {
          if (Buffer.isBuffer(chunk.report)) {
            // transform and store the path to the report (illustrative purposes only)
            chunk.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${chunk.id}.png`;
            fileWriteProms.push(Fs.promises.writeFile(chunk.reportPath, chunk.report));
            // don't include the report Buffer in the JSON since there should be a file
            delete chunk.report;
          }
          callback(null, chunk);
        }
      }),
      // add a transform that formats the JSON into an array string suitable for file write
      async function* transformStringify(chunksAsync) {
        yield '[';
        let cnt = -1;
        for await (const chunk of chunksAsync) {
          cnt++;
          yield `${cnt ? ',' : ''}${JSON.stringify(chunk)}`;
        }
        yield ']';
      },
      Fs.createWriteStream(rslt.jsonFile)
    );
  }
  if (fileWriteProms.length) {
    await Promise.all(fileWriteProms);
  }

  return rslt;
};

/**
 * @import { Manager, typedefs } from 'sqler'
 */

Update Rows (streaming)

db/mdb/update.table1.rows.sql

sql
  UPDATE sqlermysql.TEST
  SET NAME = :name, UPDATED_AT = :updated
  WHERE ID = :id

db/mdb/update.table2.rows.sql

sql
  UPDATE sqlermysql.TEST2
  SET NAME = :name2, UPDATED_AT = :updated2
  WHERE ID = :id2
js
'use strict';

const Stream = require('stream');
const { pipeline } = require('stream/promises');

/**
 * Example using streams
 * export just to illustrate module usage
 * @param {Manager} manager sqler manager
 * @param {String} connName The connection name to use 
 * @returns {Promise<typedefs.SQLERExecResults>}
 */
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // binds
  const table1BindsArray = [
    {
      id: 100, name: '', updated: date
    },
    {
      id: 200, name: '', updated: date
    },
  ];
  const table2BindsArray = [
    {
      id2: 100, name2: '', updated2: date
    },
    {
      id2: 200, name2: '', updated2: date
    }
  ];
  /** @type {typedefs.SQLERExecResults} */
  const rtn = {};

  //-------------------------------------------------------
  // There are two different ways to perform a transaction
  // 1. Implicit (suitable for a single execution per tx)
  // 2. Explicit (suitable for multiple executions per tx)

  // using implicit transactions:
  await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using an explicit transaction:
  await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement:
  await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement within an explicit transaction
  await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  return rtn;
};

/**
 * implicitTransactionUpdate
 * @param {Manager} manager sqler manager
 * @param {String} connName The connection name to use 
 * @param {typedefs.SQLERExecResults} rtn Returned results
 * @param {Object[]} table1BindsArray 1st table binds
 * @param {Object[]} table2BindsArray 2nd table binds
 */
async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // simply rename all the bind names to reflect the action being performed
  nameAll('UPDATE_STREAM', 'Implicit transaction', table1BindsArray, table2BindsArray);

  // loop through and perform the updates via the writable stream
  let ni = 0;
  const bindsArrays = [ table1BindsArray, table2BindsArray ];
  rtn.txImpRslts = new Array(bindsArrays.length);
  for (let bindsArray of bindsArrays) {
    // Example using an implicit transaction for each streamed (autoCommit = true is the default)
    rtn.txImpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
      // update binds will be batched in groups of 1 before streaming them to the database since
      // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
      // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
      stream: 1
      // no need to set execOpts.binds since they will be streamed from the update instead
    });
    
    // now that the write streams are ready and the read binds have been renamed,
    // we can cycle through the bind arrays and write them to the appropriate tables
    for (let writeStream of rtn.txImpRslts[ni].rows) {
      await pipeline(
        // here we're just using some static values for illustration purposes, but they can come from a
        // any readable stream source like a file, database, etc. as long as they are "transformed"
        // into JSON binds before the sqler writable stream receives them
        Stream.Readable.from(bindsArray),
        writeStream
      );
    }
    ni++;
  }
}

/**
 * explicitTransactionUpdate
 * @param {Manager} manager sqler manager
 * @param {String} connName The connection name to use 
 * @param {typedefs.SQLERExecResults} rtn Returned results
 * @param {Object[]} table1BindsArray 1st table binds
 * @param {Object[]} table2BindsArray 2nd table binds
 */
async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // start a transaction
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    tx = await manager.db[connName].beginTransaction();

    // simply rename all the bind names to reflect the action being performed
    nameAll('UPDATE_STREAM_TX', 'Explicit transaction', table1BindsArray, table2BindsArray);

    // loop through and perform the updates via the writable stream
    let ni = 0;
    const bindsArrays = [ table1BindsArray, table2BindsArray ];
    rtn.txExpRslts = new Array(bindsArrays.length);
    for (let bindsArray of bindsArrays) {
      // Example using an implicit transaction for each streamed (autoCommit = true is the default)
      rtn.txExpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
        autoCommit: false, // don't auto-commit after execution
        transactionId: tx.id, // ensure execution takes place within transaction
        // update binds will be batched in groups of 1 before streaming them to the database since
        // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
        // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
        stream: 1
        // no need to set execOpts.binds since they will be streamed from the update instead
      });
      
      // now that the write streams are ready and the read binds have been renamed,
      // we can cycle through the bind arrays and write them to the appropriate tables
      for (let writeStream of rtn.txExpRslts[ni].rows) {
        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray),
          writeStream
        );
      }
      ni++;
    }

    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

/**
 * preparedStatementUpdate
 * @param {Manager} manager sqler manager
 * @param {String} connName The connection name to use 
 * @param {typedefs.SQLERExecResults} rtn Returned results
 * @param {Object[]} table1BindsArray 1st table binds
 * @param {Object[]} table2BindsArray 2nd table binds
 */
async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // need to keep track of at least one result for each table so that unprepare can be called on each
  // (could call unprepare using any of the returned stream results for each table)
  let psRsltTable1, psRsltTable2;
  try {
    // simply rename all the bind names to reflect the action being performed
    nameAll('UPDATE_STREAM_PS', 'Prepared statement', table1BindsArray, table2BindsArray);

    // loop through and perform the updates via the writable stream
    let ni = 0;
    const bindsArrays = [ table1BindsArray, table2BindsArray ];
    rtn.psRslts = new Array(bindsArrays.length);
    for (let bindsArray of bindsArrays) {
      // Example using an implicit transaction for each streamed (autoCommit = true is the default)
      rtn.psRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
        // flag the SQL execution as a prepared statement
        // this will cause the statement to be prepared
        // and a dedicated connection to be allocated from
        // the pool just before the first SQL executes
        prepareStatement: true,
        driverOptions: {
          // override any driver options here
        },
        // update binds will be batched in groups of 1 before streaming them to the database since
        // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
        // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
        stream: 1
        // no need to set execOpts.binds since they will be streamed from the update instead
      });

      // need to keep track of at least one result for each table so that unprepare can be called on each
      if (ni === 0 && !psRsltTable1) {
        psRsltTable1 = rtn.psRslts[ni];
      } else if (ni && !psRsltTable2) {
        psRsltTable2 = rtn.psRslts[ni];
      }

      // now that the write streams are ready and the read binds have been renamed,
      // we can cycle through the bind arrays and write them to the appropriate tables
      for (let writeStream of rtn.psRslts[ni].rows) {
        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray),
          writeStream
        );
      }
      ni++;
    }
    
  } finally {
    // since prepareStatement = true, we need to close the statement
    // and release the prepared statement connection back to the pool
    // (also drops the temporary stored procedure that executes the prepared statement)
    const proms = [];
    if (psRsltTable1) proms.push(psRsltTable1.unprepare());
    if (psRsltTable2) proms.push(psRsltTable2.unprepare());
    if (proms.length) await Promise.all(proms);
  }
}

/**
 * preparedStatementExplicitTxUpdate
 * @param {Manager} manager sqler manager
 * @param {String} connName The connection name to use 
 * @param {typedefs.SQLERExecResults} rtn Returned results
 * @param {Object[]} table1BindsArray 1st table binds
 * @param {Object[]} table2BindsArray 2nd table binds
 */
async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // simply rename all the bind names to reflect the action being performed
    nameAll('UPDATE_STREAM_PS_TX', `Prepared statement with txId ${tx.id}`, table1BindsArray, table2BindsArray);

    // loop through and perform the updates via the writable stream
    let ni = 0;
    const bindsArrays = [ table1BindsArray, table2BindsArray ];
    rtn.txExpPsRslts = new Array(bindsArrays.length);
    for (let bindsArray of bindsArrays) {
      // Example using an implicit transaction for each streamed (autoCommit = true is the default)
      rtn.txExpPsRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
        autoCommit: false, // don't auto-commit after execution
        transactionId: tx.id, // ensure execution takes place within transaction
        prepareStatement: true, // ensure a prepared statement is used
        driverOptions: {
          // override any driver options here
        },
        // update binds will be batched in groups of 1 before streaming them to the database since
        // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
        // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
        stream: 1
        // no need to set execOpts.binds since they will be streamed from the update instead
      });
  
      // now that the write streams are ready and the read binds have been renamed,
      // we can cycle through the bind arrays and write them to the appropriate tables
      for (let writeStream of rtn.txExpPsRslts[ni].rows) {
        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray),
          writeStream
        );
      }
      ni++;
    }

    // unprepare will be called on all prepared statements associated with the transaction when calling
    // commit (alt, could have called unprepare before commit)
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // unprepare will be called on all prepared statements associated with the transaction when calling
      // rollback (alt, could have called unprepare before rollback)
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

/**
 * Just a utility function to iterate over muliple bind name arrays to rename them
 * @param {String} name Name of the bind that will be renamed
 * @param {String} label Label to be used on the value
 * @param {Object[]} table1BindsArray 1st table binds
 * @param {Object[]} table2BindsArray 2nd table binds
 * @returns {String[]} Names
 */
function nameAll(name, label, table1BindsArray, table2BindsArray) {
  const ln = table1BindsArray.length + (table2BindsArray ? table2BindsArray.length : 0);
  for (let i = 0, ti, ri, barr; i < ln; i++) {
    // select which table the binds are for
    if (i < table1BindsArray.length) {
      ti = 0;
      ri = i;
      barr = table1BindsArray;
    } else {
      ti = 1;
      ri = i - table1BindsArray.length;
      barr = table2BindsArray;
    }
    // update with expanded name
    barr[ri][`name${ti ? ti + 1 : ''}`] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;
  }
  return [ 'name', 'name2' ];
}

/**
 * @import { Manager, typedefs } from 'sqler'
 */

Delete Rows (streaming)

db/mdb/delete.stream.table.rows.sql

sql
[[? createStoredProcedure]]
CREATE PROCEDURE sqlermysql.perform_test_deletes
(
  IN p_id INTEGER, IN p_id2 INTEGER
)
BEGIN
  /*
  Stored procedure is not required when executing a single SQL statement
  Also, MySQL doesn't support anonymous stored procedure blocks
  So, a temporary stored procedure is used instead
  */
  DELETE FROM sqlermysql.TEST
  WHERE ID = :id;
  DELETE FROM sqlermysql.TEST2
  WHERE ID = :id2;
END;
[[?]]
[[? callStoredProcedure]]
CALL sqlermysql.perform_test_deletes(
  :id, :id2
);
[[?]]
[[? dropStoredProcedure]]
DROP PROCEDURE sqlermysql.perform_test_deletes;
[[?]]
js
'use strict';

const Stream = require('stream');
const { pipeline } = require('stream/promises');

/**
 * Example using streams
 * export just to illustrate module usage
 * @param {Manager} manager sqler manager
 * @param {String} connName The connection name to use 
 * @returns {Promise<typedefs.SQLERExecResults>}
 */
module.exports = async function runExample(manager, connName) {

  /** @type {typedefs.SQLERTransaction} */
  let tx;
  /** @type {typedefs.SQLERExecResults} */
  let rslts;
  try {
    tx = await manager.db[connName].beginTransaction();

    // We'll need to split create, call (stream), drop stored procedure into multiple
    // executions since we cannot stream the results with create/drop in the execution.
    // We can do this using sqler fragments defined in the SQL file
    await manager.db[connName].delete.stream.table.rows({
      autoCommit: false,
      transactionId: tx.id
    }, ['createStoredProcedure']);

    // Delete rows into multiple tables within a single execution
    rslts = await manager.db[connName].delete.stream.table.rows({
      // delete binds will be batched in groups of 2 before streaming them to the database since
      // execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
      // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
      stream: 2,
      // no need to set execOpts.binds since they will be streamed from the create instead
      autoCommit: false,
      transactionId: tx.id,
    }, ['callStoredProcedure']);

    for (let writeStream of rslts.rows) {
      await pipeline(
        // here we're just using some static values for illustration purposes, but they can come from a
        // any readable stream source like a file, database, etc. as long as they are "transformed"
        // into JSON binds before the sqler writable stream receives them
        Stream.Readable.from([
          {
            id: 100, id2: 100
          },
          {
            id: 200, id2: 200
          }
        ]),
        writeStream
      )
    }

    // drop the stored procedure (done here for brevity, in reality we'd want to ensure it's dropped if creation was successful)
    await manager.db[connName].delete.stream.table.rows({
      autoCommit: false,
      transactionId: tx.id
    }, ['dropStoredProcedure']);

    await tx.commit(true);
  } catch (err) {
    if (tx) await tx.rollback(true);
    throw err;
  }

  return rslts;
};

/**
 * @import { Manager, typedefs } from 'sqler'
 */

Delete Database

db/mdb/setup/delete.database.sql

sql
DROP DATABASE sqlermysql
js
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // delete the test database
  const rslt = await manager.db[connName].setup.delete.database();

  return rslt;
};

Released under the MIT License.