Usage

💡 Oracle/Oracle XE (FREE Express Edition) Examples (see testing):

Examples:

The examples below use the following setup:

Private Options Configuration: (appended to the subsequent connection options)

{
  "univ": {
    "db": {
      "oracle": {
        "host": "sqler_oracle",
        "username":"SYSTEM",
        "password": "sqlerOracl3"
      }
    }
  }
}

Connection Options Configuration:

{
  "db": {
    "dialects": {
      "oracle": "sqler-oracle"
    },
    "connections": [
      {
        "id": "oracle",
        "name": "oracle",
        "dir": "db/oracle",
        "service": "XE",
        "dialect": "oracle",
        "pool": {
          "min": 2,
          "max": 2,
          "increment": 0
        },
        "driverOptions": {
          "global": {
            "maxRows": 0
          }
        }
      }
    ]
  }
}

Test code that illustrates how to use the Oracle database with various examples

// assuming "conf" contains combined "univ" and "db" objects from above

// create/initialize manager
const manager = new Manager(conf);
await manager.init();

// see subsequent examples for different examples
const result = await runExample(manager, 'oracle');

console.log('Result:', result);

// after we're done using the manager we should close it
process.on('SIGINT', async function sigintDB() {
  await manager.close();
  console.log('Manager has been closed');
});

Create Table(s):

-- db/oracle/setup/create.table1.sql
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // create the database and/or schema
  return manager.db[connName].setup.create.database();
};
-- db/oracle/setup/create.table2.sql
CREATE TABLE TEST2 ("ID" INTEGER NOT NULL PRIMARY KEY, "NAME" VARCHAR2(512), "REPORT" BLOB, "CREATED_AT" TIMESTAMP WITH TIME ZONE, "UPDATED_AT" TIMESTAMP WITH TIME ZONE)
'use strict';

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  // create the tables (in parallel)
  return Promise.all([
    manager.db[connName].setup.create.table1(),
    manager.db[connName].setup.create.table2()
  ]);
};

Create Rows:

-- db/oracle/create.table1.rows.sql
INSERT INTO TEST (ID, "NAME", CREATED_AT, UPDATED_AT)
VALUES (:id, :name, :created, :updated)
-- db/oracle/create.table2.rows.sql
INSERT INTO TEST2 (ID, NAME, REPORT, CREATED_AT, UPDATED_AT)
VALUES (:id2, :name2, EMPTY_BLOB(), :created2, :updated2)
RETURNING REPORT INTO :report2
'use strict';

const typedefs = require('sqler/typedefs');
const Fs = require('fs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();
  /** @type {typedefs.SQLERExecResults[]} */
  const rtn = new Array(2);

  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // Insert rows (implicit transactions)
    rtn[0] = await manager.db[connName].create.table1.rows({
      name: 'TX Table 1 (ROWS CREATE)', // name is optional
      autoCommit: false, // transaction needs to span the INSERT and LOB/stream
      transactionId: tx.id, // ensure execution takes place within transaction
      binds: {
        // illustrates the use of Oracle specific binds
        // (sqler will interpolate "${SOME_NAME}" into "oracledb.SOME_NAME")
        // alt would be "id: 1" and "name: 'TABLE: 1, ROW: 1'"
        id: {
          val: 1,
          type: '${NUMBER}',
          dir: '${BIND_IN}'
        },
        name: {
          val: 'TABLE: 1, ROW: 1, CREATE: "Initial creation"',
          dir: '${BIND_INOUT}',
          maxSize: 500
        },
        created: date,
        updated: date
      }
    });
    rtn[1] = await manager.db[connName].create.table2.rows({
      name: 'TX Table 2 (ROWS CREATE)', // name is optional
      autoCommit: false, // transaction needs to span the INSERT and pipe()
      transactionId: tx.id, // ensure execution takes place within transaction
      binds: {
        id2: 1,
        name2: 'TABLE: 2, ROW: 1, CREATE: "Initial creation"',
        // tell Oracle that a LOB is inbound - SQL using "RETURNING INTO"
        // (for small files, contents can be directly set on report2)
        report2: { type: '${BLOB}', dir: '${BIND_OUT}' },
        created2: date,
        updated2: date
      }
    });

    // wait until inbound streaming of report2 LOB has been completed
    await streamLobFromFile(rtn[1], 'report2', './test/files/audit-report.png');

    // commit the transaction
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // rollback the transaction
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }

  return rtn;
};

/**
 * Streams a LOB from a file path into an `oracledb.Lob` instance
 * @param {typedefs.SQLERExecResults} rslt The `sqler` results that contains the Oracle
 * `rslt.raw.outBinds`
 * @param {String} name The inbound LOB parameter name that will be streamed
 * @param {String} pathToLOB The LOB file path to stream
 * @returns {typedefs.SQLERExecResults} The passed results
 */
async function streamLobFromFile(rslt, name, pathToLOB) {
  // raw Oracle "outBinds" should contain the bind parameter name
  if (!rslt.raw.outBinds || !rslt.raw.outBinds[name] || !rslt.raw.outBinds[name][0]) {
    throw new Error(`Missing RETURNING INTO statement for LOB streaming SQL for "${name}"?`);
  }
  // for "type: '${BLOB}', dir: '${BIND_OUT}'", Oracle returns a stream
  const lob = rslt.raw.outBinds[name][0];
  await pipeline(
    Fs.createReadStream(pathToLOB),
    lob
  );
  return rslt
}

Read Rows:

-- db/oracle/read.table1.rows.sql
SELECT TST.ID AS "id", TST.NAME AS "name", -- EMPTY_CLOB() AS "report",
TST.CREATED_AT AS "created", TST.UPDATED_AT AS "updated"
FROM TEST TST
WHERE UPPER(TST.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%') 
-- db/oracle/read.table2.rows.sql
SELECT TST2.ID AS "id", TST2.NAME AS "name", TST2.REPORT AS "report",
TST2.CREATED_AT AS "created", TST2.UPDATED_AT AS "updated"
FROM TEST2 TST2
WHERE UPPER(TST2.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')
'use strict';

const typedefs = require('sqler/typedefs');
const Os = require('os');
const Fs = require('fs');

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
  /** @type {typedefs.SQLERExecResults[]} */
  const rtn = new Array(2);

  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction (needed to keep LOB stream open)
    tx = await manager.db[connName].beginTransaction();

    // read from multiple tables
    rtn[0] = manager.db[connName].read.table1.rows({ binds: { name: 'table' } });
    rtn[1] = manager.db[connName].read.table2.rows({
      autoCommit: false, // transaction needs to span the life of the LOB stream
      transactionId: tx.id, // ensure execution takes place within transaction
      binds: { name: 'table' }
    });
    rtn[0] = await rtn[0];
    rtn[1] = await rtn[1];

    // write report to file?
    const writeProms = [];
    for (let row of rtn[1].rows) {
      if (row.report) {
        // store the path to the report (illustrative purposes only)
        row.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${row.id}.png`;
        writeProms.push(streamLobToFile(row.report, row.reportPath));
      }
    }
    if (writeProms.length) {
      await Promise.all(writeProms);
    }

    // commit the transaction
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // rollback the transaction
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }

  return { rows: [ ...rtn[0].rows, ...rtn[1].rows ] };
};

/**
 * Streams a LOB `oracledb.Lob` instance into a file
 * @param {Object} lob The outbound LOB parameter name that will be streamed
 * @param {String} pathToLOB The LOB file path to stream
 * @returns {Promise} The LOB to file promise
 */
 function streamLobToFile(lob, pathToLOB) {
  return new Promise((resolve, reject) => {
    const writeStream = Fs.createWriteStream(pathToLOB);
    writeStream.on('error', (err) => lob.destroy(err));
    lob.on('close', () => resolve());
    lob.on('end', () => lob.destroy());
    lob.on('error', (err) => reject(err));
    lob.pipe(writeStream);
  });
}

Update Rows:

-- db/oracle/update.table1.rows.sql
UPDATE TEST
SET NAME = :name, UPDATED_AT = :updated
WHERE ID = :id
-- db/oracle/update.table2.rows.sql
UPDATE TEST2
SET NAME = :name2, UPDATED_AT = :updated2
WHERE ID = :id2
'use strict';

const typedefs = require('sqler/typedefs');

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // binds
  const table1BindsArray = [
    {
      id: 1, name: '', updated: date
    }
  ];
  const table2BindsArray = [
    {
      id2: 1, name2: '', updated2: date
    }
  ];
  const rtn = {};

  //-------------------------------------------------------
  // There are two different ways to perform a transaction
  // 1. Implicit (suitable for a single execution per tx)
  // 2. Explicit (suitable for multiple executions per tx)

  //-------------------------------------------------------
  // There are two different ways to perform a transaction
  // 1. Implicit (suitable for a single execution per tx)
  // 2. Explicit (suitable for multiple executions per tx)

  // using implicit transactions:
  await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using an explicit transaction:
  await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement:
  await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement within an explicit transaction
  await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  return rtn;
};

async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // returned results for both tables
  rtn.txImpRslts = new Array(table1BindsArray.length + table2BindsArray.length);

  // simple iterator over all the binds
  forEach('UPDATE', 'Implicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

    // Example concurrent execution using an implicit transaction for
    // each SQL execution (autoCommit = true is the default)
    rtn.txImpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
      name: binds[nameProp], // execution name is optional
      binds
    });

  });

  // could have also ran is series by awaiting when the SQL function is called
  for (let i = 0; i < rtn.txImpRslts.length; i++) {
    rtn.txImpRslts[i] = await rtn.txImpRslts[i];
  }
}

async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // returned results for both tables
  rtn.txExpRslts = new Array(table1BindsArray.length + table2BindsArray.length);
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // simple iterator over all the binds
    forEach('UPDATE_TX', 'Explicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

      // Example concurrent execution (same transacion)
      rtn.txExpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
        name: binds[nameProp], // execution name is optional
        binds,
        autoCommit: false,
        transactionId: tx.id, // ensure execution takes place within transaction
      });

    });
  
    // could have also ran is series by awaiting when the SQL function is called
    for (let i = 0; i < rtn.txExpRslts.length; i++) {
      rtn.txExpRslts[i] = await rtn.txExpRslts[i];
    }

    // commit the transaction
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // rollback the transaction
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

// NOTE: Prepared statements are a noop since Oracle implements the concept of statement caching instead
// See https://oracle.github.io/node-oracledb/doc/api.html#-313-statement-caching
// Example just for illustration of consistent sqler API functionality
async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // need to keep track of at least one result for each table so that unprepare can be called on each
  // (could call unprepare using any of the returned execution results for each table)
  let psRsltIndexTable1 = 0, psRsltIndexTable2;
  // returned results for both tables
  rtn.psRslts = new Array(table1BindsArray.length + table2BindsArray.length);
  try {

    // simple iterator over all the binds
    forEach('UPDATE_PS', 'Prepred statement', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

      // Example concurrent execution (same transacion)
      rtn.psRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
        name: binds[nameProp], // execution name is optional
        // flag the SQL execution as a prepared statement
        prepareStatement: true,
        // include the bind parameters
        binds
      });

      // need to keep track of at least one result for each table so that unprepare can be called on each
      if (ti && !psRsltIndexTable2) psRsltIndexTable2 = ti;

    });

    // wait for concurrent executions to complete
    for (let i = 0; i < rtn.psRslts.length; i++) {
      rtn.psRslts[i] = await rtn.psRslts[i];
    }
  } finally {
    // since prepareStatement = true, we need to close the statement
    // and release the prepared statement connection back to the pool
    // (also drops the temporary stored procedure that executes the prepared statement)
    const proms = [];
    if (rtn.psRslts[psRsltIndexTable1]) proms.push(rtn.psRslts[psRsltIndexTable1].unprepare());
    if (rtn.psRslts[psRsltIndexTable2]) proms.push(rtn.psRslts[psRsltIndexTable2].unprepare());
    if (proms.length) await Promise.all(proms);
  }
}

// NOTE: Prepared statements are a noop since Oracle implements the concept of statement caching instead
// See https://oracle.github.io/node-oracledb/doc/api.html#-313-statement-caching
// Example just for illustration of consistent sqler API functionality
async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // returned results for both tables
  rtn.txExpPsRslts = new Array(table1BindsArray.length + table2BindsArray.length);
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // simple iterator over all the binds
    forEach('UPDATE_PS_TX', `Prepred statement with txId ${tx.id}`, table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {

      // Example execution in concurrent (same transacion)
      rtn.txExpPsRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
        name: binds[nameProp], // execution name is optional
        autoCommit: false, // don't auto-commit after execution
        transactionId: tx.id, // ensure execution takes place within transaction
        prepareStatement: true, // ensure a prepared statement is used
        // include the bind parameters
        binds
      });

    });

    // wait for concurrent executions to complete
    for (let i = 0; i < rtn.txExpPsRslts.length; i++) {
      rtn.txExpPsRslts[i] = await rtn.txExpPsRslts[i];
    }

    // unprepare will be called when calling commit
    // (alt, could have called unprepare before commit)
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // unprepare will be called when calling rollback
      // (alt, could have called unprepare before rollback)
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

// just a utility function to iterate over muliple bind arrays and update bind names
function forEach(name, label, table1BindsArray, table2BindsArray, itemHandler) {
  const ln = table1BindsArray.length + table2BindsArray.length;
  for (let i = 0, ti, ri, barr, nameProp; i < ln; i++) {
    // select which table the binds are for
    if (i < table1BindsArray.length) {
      ti = 0;
      ri = i;
      barr = table1BindsArray;
    } else {
      ti = 1;
      ri = i - table1BindsArray.length;
      barr = table2BindsArray;
    }
    nameProp = `name${ti ? ti + 1 : ''}`;

    // update with expanded name
    barr[ri][nameProp] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;

    itemHandler(i, ti, ri, barr[ri], nameProp);
  }
}

Delete Rows:

-- db/oracle/delete.table1.rows.sql
DELETE FROM TEST
WHERE ID = :id
-- db/oracle/delete.table2.rows.sql
DELETE FROM TEST2
WHERE ID = :id2
'use strict';

const typedefs = require('sqler/typedefs');

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  /** @type {typedefs.SQLERExecResults[]} */
  const rtn = new Array(2);

  // delete rows (implicit transactions)
  rtn[0] = await manager.db[connName].delete.table1.rows({
    binds: { id: 1 }
  });
  rtn[1] = await manager.db[connName].delete.table2.rows({
    binds: { id2: 1 }
  });

  return rtn;
};

Create Rows (streaming using the same SQL as the prior create rows example):

'use strict';

const typedefs = require('sqler/typedefs');
const Fs = require('fs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  const bindsArray = [
    [
      {
        id: 100, name: 'TABLE: 1, ROW: 1, CREATE_STREAM: "Initial creation"', created: date, updated: date
      },
      {
        id: 200, name: 'TABLE: 1, ROW: 2, CREATE_STREAM: "Initial creation"', created: date, updated: date
      },
    ],
    [
      {
        id2: 100, name2: 'TABLE: 2, ROW: 1, CREATE_STREAM: "Initial creation"', created2: date, updated2: date,
        report2: null // see report2 in bindDefs
      },
      {
        id2: 200, name2: 'TABLE: 2, ROW: 2, CREATE_STREAM: "Initial creation"', created2: date, updated2: date,
        report2: null // see report2 in bindDefs
      },
    ]
  ];
  // since table2 is using oracledb.BIND_OUT for streaming column data,
  // there needs to be a bind definitions set on "driverOptions.exec"
  // https://oracle.github.io/node-oracledb/doc/api.html#executemanyoptbinddefs
  // https://oracle.github.io/node-oracledb/doc/api.html#oracledbconstantsbinddir
  const driverOptsArray = [
    {
      exec: {
        dmlRowCounts: true // output of the number of rows affected by each input data record
      }
    },
    {
      exec: {
        dmlRowCounts: true, // output of the number of rows affected by each input data record
        bindDefs: {
          id2: { type: '${NUMBER}' },
          name2: { type: '${STRING}', maxSize: 512 },
          created2: { type: '${DATE}' },
          updated2: { type: '${DATE}' },
          // tell Oracle that a LOB is inbound - SQL using "RETURNING INTO"
          // (for small files, contents can be directly set on report2 as STRING)
          report2: { type: '${BLOB}', dir: '${BIND_OUT}' }
        }
      }
    }
  ];
  // write column stream file would typically be from different files
  const reportsArray = [
    './test/files/audit-report.png',
    './test/files/audit-report.png'
  ];
  /** @type {typedefs.SQLERExecResults[]} */
  const rslts = new Array(bindsArray.length);

  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    for (let ti = 0; ti < bindsArray.length; ti++) {
      // Insert rows into multiple tables within a single execution
      rslts[ti] = await manager.db[connName].create[`table${ti + 1}`].rows({
        // create binds will be batched in groups of 2 before streaming them to the database since
        // execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
        // https://oracle.github.io/node-oracledb/doc/api.html#executemany
        stream: 2,
        autoCommit: false, // transaction needs to span the INSERT and pipe()
        transactionId: tx.id, // ensure execution takes place within transaction
        driverOptions: driverOptsArray[ti]
        // no need to set execOpts.binds since they will be streamed from the create instead
      });

      let proms = ti ? new Array(rslts[ti].rows.length) : null;
      for (let writeStream of rslts[ti].rows) {
        // when the batched results come in, stream the lob into the report
        if (ti) {
          writeStream.on(typedefs.EVENT_STREAM_BATCH, async (batch) => {
            for (let rslt of batch) {
              proms.push(streamLobFromFile(rslt, 'report2', reportsArray));
            }
          });
        }

        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray[ti]),
          writeStream
        );

        if (proms && proms.length) {
          // wait until inbound streaming of report2 LOB has been completed
          await Promise.all(proms);
        }
      }
    }

    // commit the transaction
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // rollback the transaction
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }

  return rslts;
};

/**
 * Streams a LOB from a file path into an `oracledb.Lob` instance
 * @param {typedefs.SQLERExecResults} rslt The `sqler` results that contains the Oracle
 * `rslt.raw.outBinds`
 * @param {String} name The inbound LOB parameter name that will be streamed
 * @param {(String | String[])} pathsToLOB The LOB file path(s) to stream
 * @returns {typedefs.SQLERExecResults} The passed results
 */
async function streamLobFromFile(rslt, name, pathsToLOB) {
  const outs = Array.isArray(rslt.outBinds) ? rslt.outBinds : [ rslt.outBinds ];
  const pths = Array.isArray(pathsToLOB) ? pathsToLOB : [ pathsToLOB ];
  let oi = 0, proms = new Array(outs.length);
  for (let out of outs) {
    // raw Oracle "outBinds" should contain the bind parameter name
    if (!out || !out[name] || !out[name][0]) {
      throw new Error(`Missing RETURNING INTO statement for LOB streaming SQL for "${name}"?`);
    }
    // for "type: '${BLOB}', dir: '${BIND_OUT}'", Oracle returns a stream
    const lob = out[name][0];
    proms[oi] = pipeline(
      Fs.createReadStream(pths[oi]),
      lob
    );
    oi++;
  }
  await Promise.all(proms);
  return rslt;
}

Read Rows (streaming using the same SQL as the prior read rows example):

'use strict';

const typedefs = require('sqler/typedefs');
const Os = require('os');
const Fs = require('fs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  /** @type {typedefs.SQLERExecResults[]} */
  const rtn = new Array(2);

  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction (needed to keep LOB stream open)
    tx = await manager.db[connName].beginTransaction();

    // stream all reads to a central JSON file (illustrative purposes only)
    rtn.jsonFile = `${Os.tmpdir()}/sqler-${connName}-read-stream-all.json`;

    let count = 0;
    for (let ti = 0; ti < rtn.length; ti++) {
      // read from multiple tables
      rtn[ti] = await manager.db[connName].read[`table${ti + 1}`].rows({
        stream: 1, // indicate reads will be streamed
        binds: { name: 'stream' }
      });

      // write binary report buffer to file?
      for (let readStream of rtn[ti].rows) {
        // read stream is Oracle implementation:
        // https://oracle.github.io/node-oracledb/doc/api.html#querystream
        await pipeline(
          readStream,
          new Stream.Transform({
            objectMode: true,
            transform: async function transformer(chunk, encoding, callback) {
              try {
                count++;
                if (chunk.report instanceof Stream.Readable) {
                  await streamLobToFile(connName, chunk.report, chunk);
                }
                callback(null, chunk);
              } catch (err) {
                callback(err, chunk);
              }
            }
          }),
          // add a transform that formats the JSON into an array string suitable for file write
          async function* transformStringify(chunksAsync) {
            yield `${ti ? ',' : '['}`;
            let cnt = -1;
            for await (const chunk of chunksAsync) {
              cnt++;
              yield `${cnt ? ',' : ''}${JSON.stringify(chunk)}`;
            }
            yield `${ti && cnt ? ']' : ''}`;
          },
          Fs.createWriteStream(rtn.jsonFile, { flags: ti ? 'a' : 'w' })
        );
      }
      
      // when nothing is written make sure the JSON file is empty (illustrative purposes only)
      if (!count) {
        Fs.promises.writeFile(rtn.jsonFile, '[]');
      }
    }
  
    // commit the transaction
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // rollback the transaction
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }

  return { rows: [ ...rtn[0].rows, ...rtn[1].rows ], jsonFile: rtn.jsonFile };
};

/**
 * Streams a LOB `oracledb.Lob` instance into a file
 * @param {String} connName The connection name that will be included in the written file name
 * @param {Object} lob The outbound LOB parameter name that will be streamed
 * @param {Object} chunk The LOB owning object
 * @returns {Promise} The LOB to file promise
 */
function streamLobToFile(connName, lob, chunk) {
  return new Promise((resolve, reject) => {
    // don't include the report in the JSON since there should be a file
    delete chunk.report;
    // stream the report into a file (illustrative purposes only)
    chunk.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${chunk.id}.png`;
    const writeStream = Fs.createWriteStream(chunk.reportPath);
    writeStream.on('error', (err) => lob.destroy(err));
    lob.on('close', () => resolve());
    lob.on('end', () => lob.destroy());
    lob.on('error', (err) => reject(err));
    lob.pipe(writeStream);
  });
}

Update Rows (streaming using the same SQL as the prior update rows example):

'use strict';

const typedefs = require('sqler/typedefs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  const date = new Date();

  // binds
  const table1BindsArray = [
    {
      id: 100, name: '', updated: date
    },
    {
      id: 200, name: '', updated: date
    },
  ];
  const table2BindsArray = [
    {
      id2: 100, name2: '', updated2: date
    },
    {
      id2: 200, name2: '', updated2: date
    }
  ];
  const rtn = {};

  //-------------------------------------------------------
  // There are two different ways to perform a transaction
  // 1. Implicit (suitable for a single execution per tx)
  // 2. Explicit (suitable for multiple executions per tx)

  // using implicit transactions:
  await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using an explicit transaction:
  await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement:
  await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  // Using a prepared statement within an explicit transaction
  await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);

  return rtn;
};

async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // simply rename all the bind names to reflect the action being performed
  nameAll('UPDATE_STREAM', 'Implicit transaction', table1BindsArray, table2BindsArray);

  // loop through and perform the updates via the writable stream
  let ni = 0;
  const bindsArrays = [ table1BindsArray, table2BindsArray ];
  rtn.txImpRslts = new Array(bindsArrays.length);
  for (let bindsArray of bindsArrays) {
    // Example using an implicit transaction for each streamed (autoCommit = true is the default)
    rtn.txImpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
      // update binds will be batched in groups of 1 before streaming them to the database since
      // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
      // https://oracle.github.io/node-oracledb/doc/api.html#executemany
      stream: 1
      // no need to set execOpts.binds since they will be streamed from the update instead
    });
    
    // now that the write streams are ready and the read binds have been renamed,
    // we can cycle through the bind arrays and write them to the appropriate tables
    for (let writeStream of rtn.txImpRslts[ni].rows) {
      await pipeline(
        // here we're just using some static values for illustration purposes, but they can come from a
        // any readable stream source like a file, database, etc. as long as they are "transformed"
        // into JSON binds before the sqler writable stream receives them
        Stream.Readable.from(bindsArray),
        writeStream
      );
    }
    ni++;
  }
}

async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // start a transaction
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    tx = await manager.db[connName].beginTransaction();

    // simply rename all the bind names to reflect the action being performed
    nameAll('UPDATE_STREAM_TX', 'Explicit transaction', table1BindsArray, table2BindsArray);

    // loop through and perform the updates via the writable stream
    let ni = 0;
    const bindsArrays = [ table1BindsArray, table2BindsArray ];
    rtn.txExpRslts = new Array(bindsArrays.length);
    for (let bindsArray of bindsArrays) {
      // Example using an implicit transaction for each streamed (autoCommit = true is the default)
      rtn.txExpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
        autoCommit: false, // don't auto-commit after execution
        transactionId: tx.id, // ensure execution takes place within transaction
        // update binds will be batched in groups of 1 before streaming them to the database since
        // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
        // https://oracle.github.io/node-oracledb/doc/api.html#executemany
        stream: 1
        // no need to set execOpts.binds since they will be streamed from the update instead
      });
      
      // now that the write streams are ready and the read binds have been renamed,
      // we can cycle through the bind arrays and write them to the appropriate tables
      for (let writeStream of rtn.txExpRslts[ni].rows) {
        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray),
          writeStream
        );
      }
      ni++;
    }

    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  // need to keep track of at least one result for each table so that unprepare can be called on each
  // (could call unprepare using any of the returned stream results for each table)
  let psRsltTable1, psRsltTable2;
  try {
    // simply rename all the bind names to reflect the action being performed
    nameAll('UPDATE_STREAM_PS', 'Prepared statement', table1BindsArray, table2BindsArray);

    // loop through and perform the updates via the writable stream
    let ni = 0;
    const bindsArrays = [ table1BindsArray, table2BindsArray ];
    rtn.psRslts = new Array(bindsArrays.length);
    for (let bindsArray of bindsArrays) {
      // Example using an implicit transaction for each streamed (autoCommit = true is the default)
      rtn.psRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
        // prepared statement flag does not really do anything, but to show universal sqler API use
        // https://oracle.github.io/node-oracledb/doc/api.html#stmtcache
        prepareStatement: true,
        // update binds will be batched in groups of 1 before streaming them to the database since
        // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
        // https://oracle.github.io/node-oracledb/doc/api.html#executemany
        stream: 1
        // no need to set execOpts.binds since they will be streamed from the update instead
      });

      // need to keep track of at least one result for each table so that unprepare can be called on each
      if (ni === 0 && !psRsltTable1) {
        psRsltTable1 = rtn.psRslts[ni];
      } else if (ni && !psRsltTable2) {
        psRsltTable2 = rtn.psRslts[ni];
      }

      // now that the write streams are ready and the read binds have been renamed,
      // we can cycle through the bind arrays and write them to the appropriate tables
      for (let writeStream of rtn.psRslts[ni].rows) {
        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray),
          writeStream
        );
      }
      ni++;
    }
    
  } finally {
    // since prepareStatement = true, we need to close the statement
    // and release the prepared statement connection back to the pool
    // (also drops the temporary stored procedure that executes the prepared statement)
    const proms = [];
    if (psRsltTable1) proms.push(psRsltTable1.unprepare());
    if (psRsltTable2) proms.push(psRsltTable2.unprepare());
    if (proms.length) await Promise.all(proms);
  }
}

async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
  /** @type {typedefs.SQLERTransaction} */
  let tx;
  try {
    // start a transaction
    tx = await manager.db[connName].beginTransaction();

    // simply rename all the bind names to reflect the action being performed
    nameAll('UPDATE_STREAM_PS_TX', `Prepared statement with txId ${tx.id}`, table1BindsArray, table2BindsArray);

    // loop through and perform the updates via the writable stream
    let ni = 0;
    const bindsArrays = [ table1BindsArray, table2BindsArray ];
    rtn.txExpPsRslts = new Array(bindsArrays.length);
    for (let bindsArray of bindsArrays) {
      // Example using an implicit transaction for each streamed (autoCommit = true is the default)
      rtn.txExpPsRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
        autoCommit: false, // don't auto-commit after execution
        transactionId: tx.id, // ensure execution takes place within transaction
        // prepared statement flag does not really do anything, but to show universal sqler API use
        // https://oracle.github.io/node-oracledb/doc/api.html#stmtcache
        prepareStatement: true,
        // update binds will be batched in groups of 1 before streaming them to the database since
        // execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
        // https://oracle.github.io/node-oracledb/doc/api.html#executemany
        stream: 1
        // no need to set execOpts.binds since they will be streamed from the update instead
      });
  
      // now that the write streams are ready and the read binds have been renamed,
      // we can cycle through the bind arrays and write them to the appropriate tables
      for (let writeStream of rtn.txExpPsRslts[ni].rows) {
        await pipeline(
          // here we're just using some static values for illustration purposes, but they can come from a
          // any readable stream source like a file, database, etc. as long as they are "transformed"
          // into JSON binds before the sqler writable stream receives them
          Stream.Readable.from(bindsArray),
          writeStream
        );
      }
      ni++;
    }

    // unprepare will be called on all prepared statements associated with the transaction when calling
    // commit (alt, could have called unprepare before commit)
    await tx.commit(true); // true to release the connection back to the pool
  } catch (err) {
    if (tx) {
      // unprepare will be called on all prepared statements associated with the transaction when calling
      // rollback (alt, could have called unprepare before rollback)
      await tx.rollback(true); // true to release the connection back to the pool
    }
    throw err;
  }
}

// just a utility function to iterate over muliple bind arrays and rename them
function nameAll(name, label, table1BindsArray, table2BindsArray) {
  const ln = table1BindsArray.length + (table2BindsArray ? table2BindsArray.length : 0);
  for (let i = 0, ti, ri, barr; i < ln; i++) {
    // select which table the binds are for
    if (i < table1BindsArray.length) {
      ti = 0;
      ri = i;
      barr = table1BindsArray;
    } else {
      ti = 1;
      ri = i - table1BindsArray.length;
      barr = table2BindsArray;
    }
    // update with expanded name
    barr[ri][`name${ti ? ti + 1 : ''}`] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;
  }
  return [ 'name', 'name2' ];
}

Delete Rows (streaming using the same SQL as the prior delete rows example):

'use strict';

const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);

// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {

  /** @type {typedefs.SQLERExecResults[]} */
  const rtn = new Array(2);

  for (let ti = 0; ti < rtn.length; ti++) {
    // Delete rows from multiple tables within a single execution
    rtn[ti] = await manager.db[connName].delete[`table${ti + 1}`].rows({
      // delete binds will be batched in groups of 2 before streaming them to the database since
      // execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
      // https://oracle.github.io/node-oracledb/doc/api.html#executemany
      stream: 2
      // no need to set execOpts.binds since they will be streamed from the delete instead
    });

    for (let writeStream of rtn[ti].rows) {
      await pipeline(
        // here we're just using some static values for illustration purposes, but they can come from a
        // any readable stream source like a file, database, etc. as long as they are "transformed"
        // into JSON binds before the sqler writable stream receives them
        Stream.Readable.from([
          {
            [`id${ti ? 2 : ''}`]: 100
          },
          {
            [`id${ti ? 2 : ''}`]: 200
          }
        ]),
        writeStream
      )
    }
  }

  return rtn;
};

3.0.0 (2021-08-26)

Full Changelog

Features:

Fixes: