Usage

💡 MariaDB and/or MySQL Examples:

There are a few additional execution driverOptions that are MySQL/MariaDB specific.

Prepared Statements:

In order to facilitate a prepared statement, SQL calls are made using PREPARE, EXECUTE and DEALLOCATE PREPARE statements and are executed within a temporary stored procedure that is generated on the first execution of a SQL script that has execOpts.prepareStatement = true. The use of temporary stored procedures helps to ensure that the minumum number of round trip executions are made to the server and the user defined variables used for prepared statements are managed appropriately. Since the stored procedure is used, a valid/accessible database name must be defined on execOpts.driverOptions.preparedStatementDatabase that will be used to explicitly associate the routine with a given database. It's important to note that the generated stored procedure only exists until the resulting execution's unprepare is called (or when using a transaction, commit or rollback is called). Any calls to the same SQL script before prepare/commit/rollback that have execOpts.prepareStatement = true will use the same connection and prepared statement. Once prepare/commit/rollback is called the connection used for the prepared statement will be released back to the pool. See the Update Rows for example usage- including an example for wrapping a prepared statement within a transaction.

Examples:

The examples below use the following setup:

Private Options Configuration: (appended to the subsequent connection options)

{
  "univ": {
    "db": {
      "mdb": {
        "host": "sqler_mdb",
        "username":"root",
        "password": "my3qllocal"
      }
    }
  }
}

Connection Options Configuration:

{
  "db": {
    "dialects": {
      "mdb": "sqler-mdb"
    },
    "connections": [
      {
        "id": "mdb",
        "name": "mdb",
        "dir": "db/mdb",
        "service": "MySQL",
        "dialect": "mdb",
        "pool": {},
        "driverOptions": {
          "connection": {
            "multipleStatements": true
          }
        }
      }
    ]
  }
}

Test code that illustrates how to use MariaDB/MySQL with various examples

// assuming "conf" contains combined "univ" and "db" objects from above

// create/initialize manager
const manager = new Manager(conf);
await manager.init();

// see subsequent examples for different examples
const result = await runExample(manager, 'mdb');

console.log('Result:', result);

// after we're done using the manager we should close it
process.on('SIGINT', async function sigintDB() {
  await manager.close();
  console.log('Manager has been closed');
});

Create Database:

-- db/mdb/setup/create.database.sql
CREATE DATABASE IF NOT EXISTS sqlermysql
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // create test database
  const rslt = await manager.db[connName].setup.create.database();

  return rslt;
};

Create Table(s):

-- db/mdb/setup/create.table1.sql
CREATE TABLE IF NOT EXISTS sqlermysql.TEST (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR(512), CREATED_AT DATETIME(3), UPDATED_AT DATETIME(3))
-- db/mdb/setup/create.table2.sql
CREATE TABLE IF NOT EXISTS sqlermysql.TEST2 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR(512), REPORT BLOB, CREATED_AT DATETIME(3), UPDATED_AT DATETIME(3))
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // create test table records
  const rslt = await manager.db[connName].setup.create.table1();

  return rslt;
};
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // create test table two records
  const rslt = await manager.db[connName].setup.create.table2();

  return rslt;
};

Create Rows:

-- db/mdb/create.table.rows.sql
CREATE PROCEDURE sqlermysql.perform_test_inserts
(
  IN p_id INTEGER, IN p_name VARCHAR(255), IN p_created DATETIME(3), IN p_updated DATETIME(3),
  IN p_id2 INTEGER, IN p_name2 VARCHAR(255), IN p_report2 BLOB, IN p_created2 DATETIME(3), IN p_updated2 DATETIME(3)
)
BEGIN
  /*
  Stored procedure is not required when executing a single SQL statement
  Also, MySQL doesn't support anonymous stored procedure blocks
  So, a temporary stored procedure is used instead
  */
  INSERT INTO sqlermysql.TEST (`ID`, `NAME`, CREATED_AT, UPDATED_AT)
  VALUES (p_id, p_name, p_created, p_updated);
  INSERT INTO sqlermysql.TEST2 (`ID`, `NAME`, REPORT, CREATED_AT, UPDATED_AT)
  VALUES (p_id2, p_name2, p_report2, p_created2, p_updated2);
END;
CALL sqlermysql.perform_test_inserts(
  :id, :name, :created, :updated,
  :id2, :name2, :report2, :created2, :updated2
);
DROP PROCEDURE sqlermysql.perform_test_inserts;
'use strict';

const Fs = require('fs');

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // The driver module currently doesn't support streaming into a column
  // (e.g. Fs.createReadStream())
  const report = await Fs.promises.readFile('./test/files/audit-report.png');

  // Insert rows into multiple tables within a single execution
  const rslt = await manager.db[connName].create.table.rows({
    binds: {
      id: 1, name: 'TABLE: 1, ROW: 1, CREATE: "Initial creation"', created: date, updated: date,
      id2: 1, name2: 'TABLE: 2, ROW: 1, CREATE: "Initial creation"', report2: report, created2: date, updated2: date
    }
  });

  return rslt;
};

Read Rows:

-- db/mdb/read.table.rows.sql
SELECT TST.ID AS "id", TST.NAME AS "name", NULL AS "report",
TST.CREATED_AT AS "created", TST.UPDATED_AT AS "updated"
FROM sqlermysql.TEST TST
WHERE UPPER(TST.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%') 
UNION
SELECT TST2.ID AS "id", TST2.NAME AS "name", TST2.REPORT AS "report",
TST2.CREATED_AT AS "created", TST2.UPDATED_AT AS "updated"
FROM sqlermysql.TEST2 TST2
WHERE UPPER(TST2.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')
'use strict';

const Os = require('os');
const Fs = require('fs');

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // stream read from multiple tables
  const rslt = await manager.db[connName].read.table.rows({ binds: { name: 'table' } });

  // write binary report buffer to file?
  const fileWriteProms = [];
  for (let row of rslt.rows) {
    if (row.report) {
      // store the path to the report (illustrative purposes only)
      row.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${row.id}.png`;
      fileWriteProms.push(Fs.promises.writeFile(row.reportPath, row.report));
    }
  }
  if (fileWriteProms.length) {
    await Promise.all(fileWriteProms);
  }

  return rslt;
};

Update Rows:

-- db/mdb/update.table.rows.sql
-- (demo multiple table updated in a single SQL script)
CREATE PROCEDURE sqlermysql.perform_test_updates
(
  IN p_id INTEGER, IN p_name VARCHAR(255), IN p_updated DATETIME(3),
  IN p_id2 INTEGER, IN p_name2 VARCHAR(255), IN p_updated2 DATETIME(3)
)
BEGIN
  /*
  Stored procedure is not required when executing a single SQL statement
  Also, MySQL doesn't support anonymous stored procedure blocks
  So, a temporary stored procedure is used instead
  */
  UPDATE sqlermysql.TEST
  SET NAME = p_name, UPDATED_AT = p_updated
  WHERE ID = p_id;
  UPDATE sqlermysql.TEST2
  SET NAME = p_name2, UPDATED_AT = p_updated2
  WHERE ID = p_id2;
END;
CALL sqlermysql.perform_test_updates(
  :id, :name, :updated,
  :id2, :name2, :updated2
);
DROP PROCEDURE sqlermysql.perform_test_updates;
-- db/mdb/update.table1.rows.sql
-- (demo prepared statements)
  UPDATE sqlermysql.TEST
  SET NAME = :name, UPDATED_AT = :updated
  WHERE ID = :id
-- db/mdb/update.table2.rows.sql
-- (demo transactions)
  UPDATE sqlermysql.TEST2
  SET NAME = :name2, UPDATED_AT = :updated2
  WHERE ID = :id2
'use strict';

const typedefs = require('sqler/typedefs');

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // binds
  const table1BindsArray = [
    {
      id: 1, name: '', updated: date
    }
  ];
  const table2BindsArray = [
    {
      id2: 1, name2: '', updated2: date
    }
  ];
  const rtn = {};

  //-------------------------------------------------------
  // There are two different ways to perform a transaction
  // 1. Implicit (suitable for a single execution per tx)
  // 2. Explicit (suitable for multiple executions per tx)

  // using implicit transactions:
  await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using an explicit transaction:
  await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement:
  await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement within an explicit transaction
  await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  return rtn;
};

async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // don't exceed connection pool count
  rtn.txImpRslts = new Array(table1BindsArray.length + table2BindsArray.length);

  // simple iterator over all the binds
  forEach('UPDATE', 'Implicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

    // Example concurrent execution using an implicit transaction for
    // each SQL execution (autoCommit = true is the default)
    rtn.txImpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
      name: binds[nameProp], // execution name is optional
      binds
    });

  });

  // could have also ran is series by awaiting when the SQL function is called
  for (let i = 0; i < rtn.txImpRslts.length; i++) {
    rtn.txImpRslts[i] = await rtn.txImpRslts[i];
  }
}

async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // don't exceed connection pool count
  rtn.txExpRslts = new Array(table1BindsArray.length + table2BindsArray.length);
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // simple iterator over all the binds
    forEach('UPDATE_TX', 'Explicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

      // Example concurrent execution (same transacion)
      rtn.txExpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
        name: binds[nameProp], // execution name is optional
        binds,
        autoCommit: false,
        transactionId: tx.id, // ensure execution takes place within transaction
      });

    });
  
    // could have also ran is series by awaiting when the SQL function is called
    for (let i = 0; i < rtn.txExpRslts.length; i++) {
      rtn.txExpRslts[i] = await rtn.txExpRslts[i];
    }

    // commit the transaction
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // rollback the transaction
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // need to keep track of at least one result for each table so that unprepare can be called on each
  // (could call unprepare using any of the returned execution results for each table)
  let psRsltIndexTable1 = 0, psRsltIndexTable2;
  // don't exceed connection pool count
  rtn.psRslts = new Array(table1BindsArray.length + table2BindsArray.length);
  try {

    // simple iterator over all the binds
    forEach('UPDATE_PS', 'Prepred statement', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

      // Example concurrent execution (same transacion)
      rtn.psRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
        name: binds[nameProp], // execution name is optional
        // flag the SQL execution as a prepared statement
        // this will cause the statement to be prepared
        // and a dedicated connection to be allocated from
        // the pool just before the first SQL executes
        prepareStatement: true,
        driverOptions: {
          // prepared statements in MySQL/MariaDB use a temporary
          // stored procedure to execute prepared statements...
          // in order to do so, the stored procedure needs to have
          // a database scope defined where it will reside
          preparedStatementDatabase: 'sqlermysql'
        },
        // include the bind parameters
        binds
      });

      // need to keep track of at least one result for each table so that unprepare can be called on each
      if (ti && !psRsltIndexTable2) psRsltIndexTable2 = ti;

    });

    // wait for concurrent executions to complete
    for (let i = 0; i < rtn.psRslts.length; i++) {
      rtn.psRslts[i] = await rtn.psRslts[i];
    }
  } finally {
    // since prepareStatement = true, we need to close the statement
    // and release the prepared statement connection back to the pool
    // (also drops the temporary stored procedure that executes the prepared statement)
    const proms = [];
    if (rtn.psRslts[psRsltIndexTable1]) proms.push(rtn.psRslts[psRsltIndexTable1].unprepare());
    if (rtn.psRslts[psRsltIndexTable2]) proms.push(rtn.psRslts[psRsltIndexTable2].unprepare());
    if (proms.length) await Promise.all(proms);
  }
}

async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // don't exceed connection pool count
  rtn.txExpPsRslts = new Array(table1BindsArray.length + table2BindsArray.length);
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // simple iterator over all the binds
    forEach('UPDATE_PS_TX', `Prepred statement with txId ${tx.id}`, table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

      // Example execution in concurrent (same transacion)
      rtn.txExpPsRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
        name: binds[nameProp], // execution name is optional
        autoCommit: false, // don't auto-commit after execution
        transactionId: tx.id, // ensure execution takes place within transaction
        prepareStatement: true, // ensure a prepared statement is used
        driverOptions: {
          // prepared statements in MySQL/MariaDB use a temporary
          // stored procedure to execute prepared statements...
          // in order to do so, the stored procedure needs to have
          // a database scope defined where it will reside
          preparedStatementDatabase: 'sqlermysql'
        },
        // include the bind parameters
        binds
      });

    });

    // wait for concurrent executions to complete
    for (let i = 0; i < rtn.txExpPsRslts.length; i++) {
      rtn.txExpPsRslts[i] = await rtn.txExpPsRslts[i];
    }

    // unprepare will be called when calling commit
    // (alt, could have called unprepare before commit)
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // unprepare will be called when calling rollback
      // (alt, could have called unprepare before rollback)
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

// just a utility function to iterate over muliple bind arrays and update bind names
function forEach(name, label, table1BindsArray, table2BindsArray, itemHandler) {
  const ln = table1BindsArray.length + table2BindsArray.length;
  for (let i = 0, ti, ri, barr, nameProp; i < ln; i++) {
    // select which table the binds are for
    if (i < table1BindsArray.length) {
      ti = 0;
      ri = i;
      barr = table1BindsArray;
    } else {
      ti = 1;
      ri = i - table1BindsArray.length;
      barr = table2BindsArray;
    }
    nameProp = `name${ti ? ti + 1 : ''}`;

    // update with expanded name
    barr[ri][nameProp] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;

    itemHandler(i, ti, ri, barr[ri], nameProp);
  }
}

Delete Rows:

-- db/mdb/delete.table.rows.sql
CREATE PROCEDURE sqlermysql.perform_test_deletes
(
  IN p_id INTEGER, IN p_id2 INTEGER
)
BEGIN
  /*
  Stored procedure is not required when executing a single SQL statement
  Also, MySQL doesn't support anonymous stored procedure blocks
  So, a temporary stored procedure is used instead
  */
  DELETE FROM sqlermysql.TEST
  WHERE ID = :id;
  DELETE FROM sqlermysql.TEST2
  WHERE ID = :id2;
END;
CALL sqlermysql.perform_test_deletes(
  :id, :id2
);
DROP PROCEDURE sqlermysql.perform_test_deletes;
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // Delete rows from multiple tables within a single execution
  const rslt = await manager.db[connName].delete.table.rows({
    binds: { id: 1, id2: 1 }
  });

  return rslt;
};

Create Rows (streaming using the same SQL as the prior create rows example):

'use strict';

const Fs = require('fs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // The driver module currently doesn't support streaming into a column
  // (e.g. Fs.createReadStream())
  const report = await Fs.promises.readFile('./test/files/audit-report.png');

  // Insert rows into multiple tables within a single execution
  const rslts = await manager.db[connName].create.table.rows({
    // create binds will be batched in groups of 2 before streaming them to the database since
    // execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
    // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
    stream: 2
    // no need to set execOpts.binds since they will be streamed from the create instead
  });

  for (let writeStream of rslts.rows) {
    await pipeline(
      // here we're just using some static values for illustration purposes, but they can come from a
      // any readable stream source like a file, database, etc. as long as they are "transformed"
      // into JSON binds before the sqler writable stream receives them
      Stream.Readable.from([
        {
          id: 100, name: 'TABLE: 1, ROW: 1, CREATE_STREAM: "Initial creation"', created: date, updated: date,
          id2: 100, name2: 'TABLE: 2, ROW: 1, CREATE_STREAM: "Initial creation"', report2: report, created2: date, updated2: date
        },
        {
          id: 200, name: 'TABLE: 1, ROW: 2, CREATE_STREAM: "Initial creation"', created: date, updated: date,
          id2: 200, name2: 'TABLE: 2, ROW: 2, CREATE_STREAM: "Initial creation"', report2: report, created2: date, updated2: date
        }
      ]),
      writeStream
    )
  }

  return rslts;
};

Read Rows (streaming using the same SQL as the prior read rows example):

'use strict';

const typedefs = require('sqler/typedefs');
const Os = require('os');
const Fs = require('fs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // read from multiple tables
  const rslt = await manager.db[connName].read.table.rows({
    stream: 1, // indicate reads will be streamed
    binds: { name: 'stream' }
  });

  // stream all reads to a central JSON file (illustrative purposes only)
  rslt.jsonFile = `${Os.tmpdir()}/sqler-${connName}-read-stream-all.json`;

  // write binary report buffer to file?
  const fileWriteProms = [];
  for (let readStream of rslt.rows) {
    // read stream is MDB implementation:
    // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionquerystreamsql-values-emitter
    await pipeline(
      readStream,
      new Stream.Transform({
        objectMode: true,
        transform: function transformer(chunk, encoding, callback) {
          if (Buffer.isBuffer(chunk.report)) {
            // transform and store the path to the report (illustrative purposes only)
            chunk.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${chunk.id}.png`;
            fileWriteProms.push(Fs.promises.writeFile(chunk.reportPath, chunk.report));
            // don't include the report Buffer in the JSON since there should be a file
            delete chunk.report;
          }
          callback(null, chunk);
        }
      }),
      // add a transform that formats the JSON into an array string suitable for file write
      async function* transformStringify(chunksAsync) {
        yield '[';
        let cnt = -1;
        for await (const chunk of chunksAsync) {
          cnt++;
          yield `${cnt ? ',' : ''}${JSON.stringify(chunk)}`;
        }
        yield ']';
      },
      Fs.createWriteStream(rslt.jsonFile)
    );
  }
  if (fileWriteProms.length) {
    await Promise.all(fileWriteProms);
  }

  return rslt;
};

Update Rows (streaming using the same SQL as the prior update rows example):

'use strict';

const typedefs = require('sqler/typedefs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // binds
  const table1BindsArray = [
    {
      id: 100, name: '', updated: date
    },
    {
      id: 200, name: '', updated: date
    },
  ];
  const table2BindsArray = [
    {
      id2: 100, name2: '', updated2: date
    },
    {
      id2: 200, name2: '', updated2: date
    }
  ];
  const rtn = {};

  //-------------------------------------------------------
  // There are two different ways to perform a transaction
  // 1. Implicit (suitable for a single execution per tx)
  // 2. Explicit (suitable for multiple executions per tx)

  // using implicit transactions:
  await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using an explicit transaction:
  await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement:
  await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement within an explicit transaction
  await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  return rtn;
};

async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // simply rename all the bind names to reflect the action being performed
  nameAll('UPDATE_STREAM', 'Implicit transaction', table1BindsArray, table2BindsArray);

  // loop through and perform the updates via the writable stream
  let ni = 0;
  const bindsArrays = [ table1BindsArray, table2BindsArray ];
  rtn.txImpRslts = new Array(bindsArrays.length);
  for (let bindsArray of bindsArrays) {
    // Example using an implicit transaction for each streamed (autoCommit = true is the default)
    rtn.txImpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
      // update binds will be batched in groups of 1 before streaming them to the database since
      // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
      // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
      stream: 1
      // no need to set execOpts.binds since they will be streamed from the update instead
    });
    
    // now that the write streams are ready and the read binds have been renamed,
    // we can cycle through the bind arrays and write them to the appropriate tables
    for (let writeStream of rtn.txImpRslts[ni].rows) {
      await pipeline(
        // here we're just using some static values for illustration purposes, but they can come from a
        // any readable stream source like a file, database, etc. as long as they are "transformed"
        // into JSON binds before the sqler writable stream receives them
        Stream.Readable.from(bindsArray),
        writeStream
      );
    }
    ni++;
  }
}

async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // start a transaction
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    tx = await manager.db[connName].beginTransaction();

    // simply rename all the bind names to reflect the action being performed
    nameAll('UPDATE_STREAM_TX', 'Explicit transaction', table1BindsArray, table2BindsArray);

    // loop through and perform the updates via the writable stream
    let ni = 0;
    const bindsArrays = [ table1BindsArray, table2BindsArray ];
    rtn.txExpRslts = new Array(bindsArrays.length);
    for (let bindsArray of bindsArrays) {
      // Example using an implicit transaction for each streamed (autoCommit = true is the default)
      rtn.txExpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
        autoCommit: false, // don't auto-commit after execution
        transactionId: tx.id, // ensure execution takes place within transaction
        // update binds will be batched in groups of 1 before streaming them to the database since
        // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
        // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
        stream: 1
        // no need to set execOpts.binds since they will be streamed from the update instead
      });
      
      // now that the write streams are ready and the read binds have been renamed,
      // we can cycle through the bind arrays and write them to the appropriate tables
      for (let writeStream of rtn.txExpRslts[ni].rows) {
        // writeStream.on('end', async () => {console.log('WRITE END!!!!!!!!!!!!!!!!!!!!!', tx)
        //   if (tx.state.isReleased) return;
        //   const isReleaseConn = (tx.state.committed + tx.state.rolledback) === (bindsArrays.length - 1);
        //   //await tx.commit(isReleaseConn);
        // })
        // writeStream.on('error', async (err) => {console.log('WRITE ERROR!!!!!!!!!!!!!!!!!!!!!', tx)
        //   if (tx.state.isReleased) return;
        //   const isReleaseConn = (tx.state.committed + tx.state.rolledback) === (bindsArrays.length - 1);
        //   //await tx.rollback(isReleaseConn);
        // });
        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray),
          writeStream
        );
      }
      ni++;
    }

    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // need to keep track of at least one result for each table so that unprepare can be called on each
  // (could call unprepare using any of the returned stream results for each table)
  let psRsltTable1, psRsltTable2;
  try {
    // simply rename all the bind names to reflect the action being performed
    nameAll('UPDATE_STREAM_PS', 'Prepared statement', table1BindsArray, table2BindsArray);

    // loop through and perform the updates via the writable stream
    let ni = 0;
    const bindsArrays = [ table1BindsArray, table2BindsArray ];
    rtn.psRslts = new Array(bindsArrays.length);
    for (let bindsArray of bindsArrays) {
      // Example using an implicit transaction for each streamed (autoCommit = true is the default)
      rtn.psRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
        // flag the SQL execution as a prepared statement
        // this will cause the statement to be prepared
        // and a dedicated connection to be allocated from
        // the pool just before the first SQL executes
        prepareStatement: true,
        driverOptions: {
          // prepared statements in MySQL/MariaDB use a temporary
          // stored procedure to execute prepared statements...
          // in order to do so, the stored procedure needs to have
          // a database scope defined where it will reside
          preparedStatementDatabase: 'sqlermysql'
        },
        // update binds will be batched in groups of 1 before streaming them to the database since
        // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
        // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
        stream: 1
        // no need to set execOpts.binds since they will be streamed from the update instead
      });

      // need to keep track of at least one result for each table so that unprepare can be called on each
      if (ni === 0 && !psRsltTable1) {
        psRsltTable1 = rtn.psRslts[ni];
      } else if (ni && !psRsltTable2) {
        psRsltTable2 = rtn.psRslts[ni];
      }

      // now that the write streams are ready and the read binds have been renamed,
      // we can cycle through the bind arrays and write them to the appropriate tables
      for (let writeStream of rtn.psRslts[ni].rows) {
        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray),
          writeStream
        );
      }
      ni++;
    }
    
  } finally {
    // since prepareStatement = true, we need to close the statement
    // and release the prepared statement connection back to the pool
    // (also drops the temporary stored procedure that executes the prepared statement)
    const proms = [];
    if (psRsltTable1) proms.push(psRsltTable1.unprepare());
    if (psRsltTable2) proms.push(psRsltTable2.unprepare());
    if (proms.length) await Promise.all(proms);
  }
}

async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // simply rename all the bind names to reflect the action being performed
    nameAll('UPDATE_STREAM_PS_TX', `Prepared statement with txId ${tx.id}`, table1BindsArray, table2BindsArray);

    // loop through and perform the updates via the writable stream
    let ni = 0;
    const bindsArrays = [ table1BindsArray, table2BindsArray ];
    rtn.txExpPsRslts = new Array(bindsArrays.length);
    for (let bindsArray of bindsArrays) {
      // Example using an implicit transaction for each streamed (autoCommit = true is the default)
      rtn.txExpPsRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
        autoCommit: false, // don't auto-commit after execution
        transactionId: tx.id, // ensure execution takes place within transaction
        prepareStatement: true, // ensure a prepared statement is used
        driverOptions: {
          // prepared statements in MySQL/MariaDB use a temporary
          // stored procedure to execute prepared statements...
          // in order to do so, the stored procedure needs to have
          // a database scope defined where it will reside
          preparedStatementDatabase: 'sqlermysql'
        },
        // update binds will be batched in groups of 1 before streaming them to the database since
        // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
        // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
        stream: 1
        // no need to set execOpts.binds since they will be streamed from the update instead
      });
  
      // now that the write streams are ready and the read binds have been renamed,
      // we can cycle through the bind arrays and write them to the appropriate tables
      for (let writeStream of rtn.txExpPsRslts[ni].rows) {
        // writeStream.on('end', async () => {console.log('WRITE END!!!!!!!!!!!!!!!!!!!!!', tx)
        //   if (tx.state.isReleased) return;
        //   const isReleaseConn = (tx.state.committed + tx.state.rolledback) === (bindsArrays.length - 1);
        //   //await tx.commit(isReleaseConn);
        // })
        // writeStream.on('error', async (err) => {console.log('WRITE ERROR!!!!!!!!!!!!!!!!!!!!!', tx)
        //   if (tx.state.isReleased) return;
        //   const isReleaseConn = (tx.state.committed + tx.state.rolledback) === (bindsArrays.length - 1);
        //   //await tx.rollback(isReleaseConn);
        // });
        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray),
          writeStream
        );
      }
      ni++;
    }

    // unprepare will be called on all prepared statements associated with the transaction when calling
    // commit (alt, could have called unprepare before commit)
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // unprepare will be called on all prepared statements associated with the transaction when calling
      // rollback (alt, could have called unprepare before rollback)
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

// just a utility function to iterate over muliple bind arrays and rename them
function nameAll(name, label, table1BindsArray, table2BindsArray) {
  const ln = table1BindsArray.length + (table2BindsArray ? table2BindsArray.length : 0);
  for (let i = 0, ti, ri, barr; i < ln; i++) {
    // select which table the binds are for
    if (i < table1BindsArray.length) {
      ti = 0;
      ri = i;
      barr = table1BindsArray;
    } else {
      ti = 1;
      ri = i - table1BindsArray.length;
      barr = table2BindsArray;
    }
    // update with expanded name
    barr[ri][`name${ti ? ti + 1 : ''}`] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;
  }
  return [ 'name', 'name2' ];
}

Delete Rows (streaming using the same SQL as the prior delete rows example):

'use strict';

const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // Delete rows from multiple tables within a single execution
  const rslts = await manager.db[connName].delete.table.rows({
    // delete binds will be batched in groups of 2 before streaming them to the database since
    // execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
    // https://mariadb.com/kb/en/connector-nodejs-promise-api/#connectionbatchsql-values-promise
    stream: 2
    // no need to set execOpts.binds since they will be streamed from the delete instead
  });

  for (let writeStream of rslts.rows) {
    await pipeline(
      // here we're just using some static values for illustration purposes, but they can come from a
      // any readable stream source like a file, database, etc. as long as they are "transformed"
      // into JSON binds before the sqler writable stream receives them
      Stream.Readable.from([
        {
          id: 100, id2: 100
        },
        {
          id: 200, id2: 200
        }
      ]),
      writeStream
    )
  }

  return rslts;
};

Delete Database:

-- db/mdb/setup/delete.database.sql
DROP DATABASE sqlermysql
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // delete the test database
  const rslt = await manager.db[connName].setup.delete.database();

  return rslt;
};

3.1.1 (2021-08-20)

Full Changelog

Features: