💡 Oracle/Oracle XE (FREE Express Edition) Examples (see testing):
Examples:
The examples below use the following setup:
Private Options Configuration: (appended to the subsequent connection options)
{
"univ": {
"db": {
"oracle": {
"host": "sqler_oracle",
"username":"SYSTEM",
"password": "sqlerOracl3"
}
}
}
}
Connection Options Configuration:
{
"db": {
"dialects": {
"oracle": "sqler-oracle"
},
"connections": [
{
"id": "oracle",
"name": "oracle",
"dir": "db/oracle",
"service": "XE",
"dialect": "oracle",
"pool": {
"min": 2,
"max": 2,
"increment": 0
},
"driverOptions": {
"global": {
"maxRows": 0
}
}
}
]
}
}
Test code that illustrates how to use the Oracle database with various examples
// assuming "conf" contains combined "univ" and "db" objects from above
// create/initialize manager
const manager = new Manager(conf);
await manager.init();
// see subsequent examples for different examples
const result = await runExample(manager, 'oracle');
console.log('Result:', result);
// after we're done using the manager we should close it
process.on('SIGINT', async function sigintDB() {
await manager.close();
console.log('Manager has been closed');
});
Create Table(s):
-- db/oracle/setup/create.table1.sql
'use strict';
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
// create the database and/or schema
return manager.db[connName].setup.create.database();
};
-- db/oracle/setup/create.table2.sql
CREATE TABLE TEST2 ("ID" INTEGER NOT NULL PRIMARY KEY, "NAME" VARCHAR2(512), "REPORT" BLOB, "CREATED_AT" TIMESTAMP WITH TIME ZONE, "UPDATED_AT" TIMESTAMP WITH TIME ZONE)
'use strict';
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
// create the tables (in parallel)
return Promise.all([
manager.db[connName].setup.create.table1(),
manager.db[connName].setup.create.table2()
]);
};
Create Rows:
-- db/oracle/create.table1.rows.sql
INSERT INTO TEST (ID, "NAME", CREATED_AT, UPDATED_AT)
VALUES (:id, :name, :created, :updated)
-- db/oracle/create.table2.rows.sql
INSERT INTO TEST2 (ID, NAME, REPORT, CREATED_AT, UPDATED_AT)
VALUES (:id2, :name2, EMPTY_BLOB(), :created2, :updated2)
RETURNING REPORT INTO :report2
'use strict';
const typedefs = require('sqler/typedefs');
const Fs = require('fs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
const date = new Date();
/** @type {typedefs.SQLERExecResults[]} */
const rtn = new Array(2);
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction
tx = await manager.db[connName].beginTransaction();
// Insert rows (implicit transactions)
rtn[0] = await manager.db[connName].create.table1.rows({
name: 'TX Table 1 (ROWS CREATE)', // name is optional
autoCommit: false, // transaction needs to span the INSERT and LOB/stream
transactionId: tx.id, // ensure execution takes place within transaction
binds: {
// illustrates the use of Oracle specific binds
// (sqler will interpolate "${SOME_NAME}" into "oracledb.SOME_NAME")
// alt would be "id: 1" and "name: 'TABLE: 1, ROW: 1'"
id: {
val: 1,
type: '${NUMBER}',
dir: '${BIND_IN}'
},
name: {
val: 'TABLE: 1, ROW: 1, CREATE: "Initial creation"',
dir: '${BIND_INOUT}',
maxSize: 500
},
created: date,
updated: date
}
});
rtn[1] = await manager.db[connName].create.table2.rows({
name: 'TX Table 2 (ROWS CREATE)', // name is optional
autoCommit: false, // transaction needs to span the INSERT and pipe()
transactionId: tx.id, // ensure execution takes place within transaction
binds: {
id2: 1,
name2: 'TABLE: 2, ROW: 1, CREATE: "Initial creation"',
// tell Oracle that a LOB is inbound - SQL using "RETURNING INTO"
// (for small files, contents can be directly set on report2)
report2: { type: '${BLOB}', dir: '${BIND_OUT}' },
created2: date,
updated2: date
}
});
// wait until inbound streaming of report2 LOB has been completed
await streamLobFromFile(rtn[1], 'report2', './test/files/audit-report.png');
// commit the transaction
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// rollback the transaction
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
return rtn;
};
/**
* Streams a LOB from a file path into an `oracledb.Lob` instance
* @param {typedefs.SQLERExecResults} rslt The `sqler` results that contains the Oracle
* `rslt.raw.outBinds`
* @param {String} name The inbound LOB parameter name that will be streamed
* @param {String} pathToLOB The LOB file path to stream
* @returns {typedefs.SQLERExecResults} The passed results
*/
async function streamLobFromFile(rslt, name, pathToLOB) {
// raw Oracle "outBinds" should contain the bind parameter name
if (!rslt.raw.outBinds || !rslt.raw.outBinds[name] || !rslt.raw.outBinds[name][0]) {
throw new Error(`Missing RETURNING INTO statement for LOB streaming SQL for "${name}"?`);
}
// for "type: '${BLOB}', dir: '${BIND_OUT}'", Oracle returns a stream
const lob = rslt.raw.outBinds[name][0];
await pipeline(
Fs.createReadStream(pathToLOB),
lob
);
return rslt
}
Read Rows:
-- db/oracle/read.table1.rows.sql
SELECT TST.ID AS "id", TST.NAME AS "name", -- EMPTY_CLOB() AS "report",
TST.CREATED_AT AS "created", TST.UPDATED_AT AS "updated"
FROM TEST TST
WHERE UPPER(TST.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')
-- db/oracle/read.table2.rows.sql
SELECT TST2.ID AS "id", TST2.NAME AS "name", TST2.REPORT AS "report",
TST2.CREATED_AT AS "created", TST2.UPDATED_AT AS "updated"
FROM TEST2 TST2
WHERE UPPER(TST2.NAME) LIKE CONCAT(CONCAT('%', UPPER(:name)), '%')
'use strict';
const typedefs = require('sqler/typedefs');
const Os = require('os');
const Fs = require('fs');
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
/** @type {typedefs.SQLERExecResults[]} */
const rtn = new Array(2);
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction (needed to keep LOB stream open)
tx = await manager.db[connName].beginTransaction();
// read from multiple tables
rtn[0] = manager.db[connName].read.table1.rows({ binds: { name: 'table' } });
rtn[1] = manager.db[connName].read.table2.rows({
autoCommit: false, // transaction needs to span the life of the LOB stream
transactionId: tx.id, // ensure execution takes place within transaction
binds: { name: 'table' }
});
rtn[0] = await rtn[0];
rtn[1] = await rtn[1];
// write report to file?
const writeProms = [];
for (let row of rtn[1].rows) {
if (row.report) {
// store the path to the report (illustrative purposes only)
row.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${row.id}.png`;
writeProms.push(streamLobToFile(row.report, row.reportPath));
}
}
if (writeProms.length) {
await Promise.all(writeProms);
}
// commit the transaction
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// rollback the transaction
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
return { rows: [ ...rtn[0].rows, ...rtn[1].rows ] };
};
/**
* Streams a LOB `oracledb.Lob` instance into a file
* @param {Object} lob The outbound LOB parameter name that will be streamed
* @param {String} pathToLOB The LOB file path to stream
* @returns {Promise} The LOB to file promise
*/
function streamLobToFile(lob, pathToLOB) {
return new Promise((resolve, reject) => {
const writeStream = Fs.createWriteStream(pathToLOB);
writeStream.on('error', (err) => lob.destroy(err));
lob.on('close', () => resolve());
lob.on('end', () => lob.destroy());
lob.on('error', (err) => reject(err));
lob.pipe(writeStream);
});
}
Update Rows:
-- db/oracle/update.table1.rows.sql
UPDATE TEST
SET NAME = :name, UPDATED_AT = :updated
WHERE ID = :id
-- db/oracle/update.table2.rows.sql
UPDATE TEST2
SET NAME = :name2, UPDATED_AT = :updated2
WHERE ID = :id2
'use strict';
const typedefs = require('sqler/typedefs');
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
const date = new Date();
// binds
const table1BindsArray = [
{
id: 1, name: '', updated: date
}
];
const table2BindsArray = [
{
id2: 1, name2: '', updated2: date
}
];
const rtn = {};
//-------------------------------------------------------
// There are two different ways to perform a transaction
// 1. Implicit (suitable for a single execution per tx)
// 2. Explicit (suitable for multiple executions per tx)
//-------------------------------------------------------
// There are two different ways to perform a transaction
// 1. Implicit (suitable for a single execution per tx)
// 2. Explicit (suitable for multiple executions per tx)
// using implicit transactions:
await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using an explicit transaction:
await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using a prepared statement:
await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using a prepared statement within an explicit transaction
await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
return rtn;
};
async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// returned results for both tables
rtn.txImpRslts = new Array(table1BindsArray.length + table2BindsArray.length);
// simple iterator over all the binds
forEach('UPDATE', 'Implicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {
// Example concurrent execution using an implicit transaction for
// each SQL execution (autoCommit = true is the default)
rtn.txImpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
name: binds[nameProp], // execution name is optional
binds
});
});
// could have also ran is series by awaiting when the SQL function is called
for (let i = 0; i < rtn.txImpRslts.length; i++) {
rtn.txImpRslts[i] = await rtn.txImpRslts[i];
}
}
async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// returned results for both tables
rtn.txExpRslts = new Array(table1BindsArray.length + table2BindsArray.length);
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction
tx = await manager.db[connName].beginTransaction();
// simple iterator over all the binds
forEach('UPDATE_TX', 'Explicit transaction', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {
// Example concurrent execution (same transacion)
rtn.txExpRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
name: binds[nameProp], // execution name is optional
binds,
autoCommit: false,
transactionId: tx.id, // ensure execution takes place within transaction
});
});
// could have also ran is series by awaiting when the SQL function is called
for (let i = 0; i < rtn.txExpRslts.length; i++) {
rtn.txExpRslts[i] = await rtn.txExpRslts[i];
}
// commit the transaction
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// rollback the transaction
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
}
// NOTE: Prepared statements are a noop since Oracle implements the concept of statement caching instead
// See https://oracle.github.io/node-oracledb/doc/api.html#-313-statement-caching
// Example just for illustration of consistent sqler API functionality
async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// need to keep track of at least one result for each table so that unprepare can be called on each
// (could call unprepare using any of the returned execution results for each table)
let psRsltIndexTable1 = 0, psRsltIndexTable2;
// returned results for both tables
rtn.psRslts = new Array(table1BindsArray.length + table2BindsArray.length);
try {
// simple iterator over all the binds
forEach('UPDATE_PS', 'Prepred statement', table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {
// Example concurrent execution (same transacion)
rtn.psRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
name: binds[nameProp], // execution name is optional
// flag the SQL execution as a prepared statement
prepareStatement: true,
// include the bind parameters
binds
});
// need to keep track of at least one result for each table so that unprepare can be called on each
if (ti && !psRsltIndexTable2) psRsltIndexTable2 = ti;
});
// wait for concurrent executions to complete
for (let i = 0; i < rtn.psRslts.length; i++) {
rtn.psRslts[i] = await rtn.psRslts[i];
}
} finally {
// since prepareStatement = true, we need to close the statement
// and release the prepared statement connection back to the pool
// (also drops the temporary stored procedure that executes the prepared statement)
const proms = [];
if (rtn.psRslts[psRsltIndexTable1]) proms.push(rtn.psRslts[psRsltIndexTable1].unprepare());
if (rtn.psRslts[psRsltIndexTable2]) proms.push(rtn.psRslts[psRsltIndexTable2].unprepare());
if (proms.length) await Promise.all(proms);
}
}
// NOTE: Prepared statements are a noop since Oracle implements the concept of statement caching instead
// See https://oracle.github.io/node-oracledb/doc/api.html#-313-statement-caching
// Example just for illustration of consistent sqler API functionality
async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// returned results for both tables
rtn.txExpPsRslts = new Array(table1BindsArray.length + table2BindsArray.length);
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction
tx = await manager.db[connName].beginTransaction();
// simple iterator over all the binds
forEach('UPDATE_PS_TX', `Prepred statement with txId ${tx.id}`, table1BindsArray, table2BindsArray, (idx, ti, ri, binds, nameProp) => {
// Example execution in concurrent (same transacion)
rtn.txExpPsRslts[idx] = manager.db[connName].update[`table${ti + 1}`].rows({
name: binds[nameProp], // execution name is optional
autoCommit: false, // don't auto-commit after execution
transactionId: tx.id, // ensure execution takes place within transaction
prepareStatement: true, // ensure a prepared statement is used
// include the bind parameters
binds
});
});
// wait for concurrent executions to complete
for (let i = 0; i < rtn.txExpPsRslts.length; i++) {
rtn.txExpPsRslts[i] = await rtn.txExpPsRslts[i];
}
// unprepare will be called when calling commit
// (alt, could have called unprepare before commit)
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// unprepare will be called when calling rollback
// (alt, could have called unprepare before rollback)
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
}
// just a utility function to iterate over muliple bind arrays and update bind names
function forEach(name, label, table1BindsArray, table2BindsArray, itemHandler) {
const ln = table1BindsArray.length + table2BindsArray.length;
for (let i = 0, ti, ri, barr, nameProp; i < ln; i++) {
// select which table the binds are for
if (i < table1BindsArray.length) {
ti = 0;
ri = i;
barr = table1BindsArray;
} else {
ti = 1;
ri = i - table1BindsArray.length;
barr = table2BindsArray;
}
nameProp = `name${ti ? ti + 1 : ''}`;
// update with expanded name
barr[ri][nameProp] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;
itemHandler(i, ti, ri, barr[ri], nameProp);
}
}
Delete Rows:
-- db/oracle/delete.table1.rows.sql
DELETE FROM TEST
WHERE ID = :id
-- db/oracle/delete.table2.rows.sql
DELETE FROM TEST2
WHERE ID = :id2
'use strict';
const typedefs = require('sqler/typedefs');
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
/** @type {typedefs.SQLERExecResults[]} */
const rtn = new Array(2);
// delete rows (implicit transactions)
rtn[0] = await manager.db[connName].delete.table1.rows({
binds: { id: 1 }
});
rtn[1] = await manager.db[connName].delete.table2.rows({
binds: { id2: 1 }
});
return rtn;
};
Create Rows (streaming using the same SQL as the prior create rows example):
'use strict';
const typedefs = require('sqler/typedefs');
const Fs = require('fs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
const date = new Date();
const bindsArray = [
[
{
id: 100, name: 'TABLE: 1, ROW: 1, CREATE_STREAM: "Initial creation"', created: date, updated: date
},
{
id: 200, name: 'TABLE: 1, ROW: 2, CREATE_STREAM: "Initial creation"', created: date, updated: date
},
],
[
{
id2: 100, name2: 'TABLE: 2, ROW: 1, CREATE_STREAM: "Initial creation"', created2: date, updated2: date,
report2: null // see report2 in bindDefs
},
{
id2: 200, name2: 'TABLE: 2, ROW: 2, CREATE_STREAM: "Initial creation"', created2: date, updated2: date,
report2: null // see report2 in bindDefs
},
]
];
// since table2 is using oracledb.BIND_OUT for streaming column data,
// there needs to be a bind definitions set on "driverOptions.exec"
// https://oracle.github.io/node-oracledb/doc/api.html#executemanyoptbinddefs
// https://oracle.github.io/node-oracledb/doc/api.html#oracledbconstantsbinddir
const driverOptsArray = [
{
exec: {
dmlRowCounts: true // output of the number of rows affected by each input data record
}
},
{
exec: {
dmlRowCounts: true, // output of the number of rows affected by each input data record
bindDefs: {
id2: { type: '${NUMBER}' },
name2: { type: '${STRING}', maxSize: 512 },
created2: { type: '${DATE}' },
updated2: { type: '${DATE}' },
// tell Oracle that a LOB is inbound - SQL using "RETURNING INTO"
// (for small files, contents can be directly set on report2 as STRING)
report2: { type: '${BLOB}', dir: '${BIND_OUT}' }
}
}
}
];
// write column stream file would typically be from different files
const reportsArray = [
'./test/files/audit-report.png',
'./test/files/audit-report.png'
];
/** @type {typedefs.SQLERExecResults[]} */
const rslts = new Array(bindsArray.length);
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction
tx = await manager.db[connName].beginTransaction();
for (let ti = 0; ti < bindsArray.length; ti++) {
// Insert rows into multiple tables within a single execution
rslts[ti] = await manager.db[connName].create[`table${ti + 1}`].rows({
// create binds will be batched in groups of 2 before streaming them to the database since
// execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
// https://oracle.github.io/node-oracledb/doc/api.html#executemany
stream: 2,
autoCommit: false, // transaction needs to span the INSERT and pipe()
transactionId: tx.id, // ensure execution takes place within transaction
driverOptions: driverOptsArray[ti]
// no need to set execOpts.binds since they will be streamed from the create instead
});
let proms = ti ? new Array(rslts[ti].rows.length) : null;
for (let writeStream of rslts[ti].rows) {
// when the batched results come in, stream the lob into the report
if (ti) {
writeStream.on(typedefs.EVENT_STREAM_BATCH, async (batch) => {
for (let rslt of batch) {
proms.push(streamLobFromFile(rslt, 'report2', reportsArray));
}
});
}
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from(bindsArray[ti]),
writeStream
);
if (proms && proms.length) {
// wait until inbound streaming of report2 LOB has been completed
await Promise.all(proms);
}
}
}
// commit the transaction
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// rollback the transaction
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
return rslts;
};
/**
* Streams a LOB from a file path into an `oracledb.Lob` instance
* @param {typedefs.SQLERExecResults} rslt The `sqler` results that contains the Oracle
* `rslt.raw.outBinds`
* @param {String} name The inbound LOB parameter name that will be streamed
* @param {(String | String[])} pathsToLOB The LOB file path(s) to stream
* @returns {typedefs.SQLERExecResults} The passed results
*/
async function streamLobFromFile(rslt, name, pathsToLOB) {
const outs = Array.isArray(rslt.outBinds) ? rslt.outBinds : [ rslt.outBinds ];
const pths = Array.isArray(pathsToLOB) ? pathsToLOB : [ pathsToLOB ];
let oi = 0, proms = new Array(outs.length);
for (let out of outs) {
// raw Oracle "outBinds" should contain the bind parameter name
if (!out || !out[name] || !out[name][0]) {
throw new Error(`Missing RETURNING INTO statement for LOB streaming SQL for "${name}"?`);
}
// for "type: '${BLOB}', dir: '${BIND_OUT}'", Oracle returns a stream
const lob = out[name][0];
proms[oi] = pipeline(
Fs.createReadStream(pths[oi]),
lob
);
oi++;
}
await Promise.all(proms);
return rslt;
}
Read Rows (streaming using the same SQL as the prior read rows example):
'use strict';
const typedefs = require('sqler/typedefs');
const Os = require('os');
const Fs = require('fs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
/** @type {typedefs.SQLERExecResults[]} */
const rtn = new Array(2);
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction (needed to keep LOB stream open)
tx = await manager.db[connName].beginTransaction();
// stream all reads to a central JSON file (illustrative purposes only)
rtn.jsonFile = `${Os.tmpdir()}/sqler-${connName}-read-stream-all.json`;
let count = 0;
for (let ti = 0; ti < rtn.length; ti++) {
// read from multiple tables
rtn[ti] = await manager.db[connName].read[`table${ti + 1}`].rows({
stream: 1, // indicate reads will be streamed
binds: { name: 'stream' }
});
// write binary report buffer to file?
for (let readStream of rtn[ti].rows) {
// read stream is Oracle implementation:
// https://oracle.github.io/node-oracledb/doc/api.html#querystream
await pipeline(
readStream,
new Stream.Transform({
objectMode: true,
transform: async function transformer(chunk, encoding, callback) {
try {
count++;
if (chunk.report instanceof Stream.Readable) {
await streamLobToFile(connName, chunk.report, chunk);
}
callback(null, chunk);
} catch (err) {
callback(err, chunk);
}
}
}),
// add a transform that formats the JSON into an array string suitable for file write
async function* transformStringify(chunksAsync) {
yield `${ti ? ',' : '['}`;
let cnt = -1;
for await (const chunk of chunksAsync) {
cnt++;
yield `${cnt ? ',' : ''}${JSON.stringify(chunk)}`;
}
yield `${ti && cnt ? ']' : ''}`;
},
Fs.createWriteStream(rtn.jsonFile, { flags: ti ? 'a' : 'w' })
);
}
// when nothing is written make sure the JSON file is empty (illustrative purposes only)
if (!count) {
Fs.promises.writeFile(rtn.jsonFile, '[]');
}
}
// commit the transaction
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// rollback the transaction
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
return { rows: [ ...rtn[0].rows, ...rtn[1].rows ], jsonFile: rtn.jsonFile };
};
/**
* Streams a LOB `oracledb.Lob` instance into a file
* @param {String} connName The connection name that will be included in the written file name
* @param {Object} lob The outbound LOB parameter name that will be streamed
* @param {Object} chunk The LOB owning object
* @returns {Promise} The LOB to file promise
*/
function streamLobToFile(connName, lob, chunk) {
return new Promise((resolve, reject) => {
// don't include the report in the JSON since there should be a file
delete chunk.report;
// stream the report into a file (illustrative purposes only)
chunk.reportPath = `${Os.tmpdir()}/sqler-${connName}-read-${chunk.id}.png`;
const writeStream = Fs.createWriteStream(chunk.reportPath);
writeStream.on('error', (err) => lob.destroy(err));
lob.on('close', () => resolve());
lob.on('end', () => lob.destroy());
lob.on('error', (err) => reject(err));
lob.pipe(writeStream);
});
}
Update Rows (streaming using the same SQL as the prior update rows example):
'use strict';
const typedefs = require('sqler/typedefs');
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
const date = new Date();
// binds
const table1BindsArray = [
{
id: 100, name: '', updated: date
},
{
id: 200, name: '', updated: date
},
];
const table2BindsArray = [
{
id2: 100, name2: '', updated2: date
},
{
id2: 200, name2: '', updated2: date
}
];
const rtn = {};
//-------------------------------------------------------
// There are two different ways to perform a transaction
// 1. Implicit (suitable for a single execution per tx)
// 2. Explicit (suitable for multiple executions per tx)
// using implicit transactions:
await implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using an explicit transaction:
await explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using a prepared statement:
await preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
// Using a prepared statement within an explicit transaction
await preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray);
return rtn;
};
async function implicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// simply rename all the bind names to reflect the action being performed
nameAll('UPDATE_STREAM', 'Implicit transaction', table1BindsArray, table2BindsArray);
// loop through and perform the updates via the writable stream
let ni = 0;
const bindsArrays = [ table1BindsArray, table2BindsArray ];
rtn.txImpRslts = new Array(bindsArrays.length);
for (let bindsArray of bindsArrays) {
// Example using an implicit transaction for each streamed (autoCommit = true is the default)
rtn.txImpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
// update binds will be batched in groups of 1 before streaming them to the database since
// execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
// https://oracle.github.io/node-oracledb/doc/api.html#executemany
stream: 1
// no need to set execOpts.binds since they will be streamed from the update instead
});
// now that the write streams are ready and the read binds have been renamed,
// we can cycle through the bind arrays and write them to the appropriate tables
for (let writeStream of rtn.txImpRslts[ni].rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from(bindsArray),
writeStream
);
}
ni++;
}
}
async function explicitTransactionUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// start a transaction
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
tx = await manager.db[connName].beginTransaction();
// simply rename all the bind names to reflect the action being performed
nameAll('UPDATE_STREAM_TX', 'Explicit transaction', table1BindsArray, table2BindsArray);
// loop through and perform the updates via the writable stream
let ni = 0;
const bindsArrays = [ table1BindsArray, table2BindsArray ];
rtn.txExpRslts = new Array(bindsArrays.length);
for (let bindsArray of bindsArrays) {
// Example using an implicit transaction for each streamed (autoCommit = true is the default)
rtn.txExpRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
autoCommit: false, // don't auto-commit after execution
transactionId: tx.id, // ensure execution takes place within transaction
// update binds will be batched in groups of 1 before streaming them to the database since
// execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
// https://oracle.github.io/node-oracledb/doc/api.html#executemany
stream: 1
// no need to set execOpts.binds since they will be streamed from the update instead
});
// now that the write streams are ready and the read binds have been renamed,
// we can cycle through the bind arrays and write them to the appropriate tables
for (let writeStream of rtn.txExpRslts[ni].rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from(bindsArray),
writeStream
);
}
ni++;
}
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
}
async function preparedStatementUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
// need to keep track of at least one result for each table so that unprepare can be called on each
// (could call unprepare using any of the returned stream results for each table)
let psRsltTable1, psRsltTable2;
try {
// simply rename all the bind names to reflect the action being performed
nameAll('UPDATE_STREAM_PS', 'Prepared statement', table1BindsArray, table2BindsArray);
// loop through and perform the updates via the writable stream
let ni = 0;
const bindsArrays = [ table1BindsArray, table2BindsArray ];
rtn.psRslts = new Array(bindsArrays.length);
for (let bindsArray of bindsArrays) {
// Example using an implicit transaction for each streamed (autoCommit = true is the default)
rtn.psRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
// prepared statement flag does not really do anything, but to show universal sqler API use
// https://oracle.github.io/node-oracledb/doc/api.html#stmtcache
prepareStatement: true,
// update binds will be batched in groups of 1 before streaming them to the database since
// execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
// https://oracle.github.io/node-oracledb/doc/api.html#executemany
stream: 1
// no need to set execOpts.binds since they will be streamed from the update instead
});
// need to keep track of at least one result for each table so that unprepare can be called on each
if (ni === 0 && !psRsltTable1) {
psRsltTable1 = rtn.psRslts[ni];
} else if (ni && !psRsltTable2) {
psRsltTable2 = rtn.psRslts[ni];
}
// now that the write streams are ready and the read binds have been renamed,
// we can cycle through the bind arrays and write them to the appropriate tables
for (let writeStream of rtn.psRslts[ni].rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from(bindsArray),
writeStream
);
}
ni++;
}
} finally {
// since prepareStatement = true, we need to close the statement
// and release the prepared statement connection back to the pool
// (also drops the temporary stored procedure that executes the prepared statement)
const proms = [];
if (psRsltTable1) proms.push(psRsltTable1.unprepare());
if (psRsltTable2) proms.push(psRsltTable2.unprepare());
if (proms.length) await Promise.all(proms);
}
}
async function preparedStatementExplicitTxUpdate(manager, connName, rtn, table1BindsArray, table2BindsArray) {
/** @type {typedefs.SQLERTransaction} */
let tx;
try {
// start a transaction
tx = await manager.db[connName].beginTransaction();
// simply rename all the bind names to reflect the action being performed
nameAll('UPDATE_STREAM_PS_TX', `Prepared statement with txId ${tx.id}`, table1BindsArray, table2BindsArray);
// loop through and perform the updates via the writable stream
let ni = 0;
const bindsArrays = [ table1BindsArray, table2BindsArray ];
rtn.txExpPsRslts = new Array(bindsArrays.length);
for (let bindsArray of bindsArrays) {
// Example using an implicit transaction for each streamed (autoCommit = true is the default)
rtn.txExpPsRslts[ni] = await manager.db[connName].update[`table${ni + 1}`].rows({
autoCommit: false, // don't auto-commit after execution
transactionId: tx.id, // ensure execution takes place within transaction
// prepared statement flag does not really do anything, but to show universal sqler API use
// https://oracle.github.io/node-oracledb/doc/api.html#stmtcache
prepareStatement: true,
// update binds will be batched in groups of 1 before streaming them to the database since
// execOpts.stream = 1, but we could have batched them in groups (stream = 2) as well
// https://oracle.github.io/node-oracledb/doc/api.html#executemany
stream: 1
// no need to set execOpts.binds since they will be streamed from the update instead
});
// now that the write streams are ready and the read binds have been renamed,
// we can cycle through the bind arrays and write them to the appropriate tables
for (let writeStream of rtn.txExpPsRslts[ni].rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from(bindsArray),
writeStream
);
}
ni++;
}
// unprepare will be called on all prepared statements associated with the transaction when calling
// commit (alt, could have called unprepare before commit)
await tx.commit(true); // true to release the connection back to the pool
} catch (err) {
if (tx) {
// unprepare will be called on all prepared statements associated with the transaction when calling
// rollback (alt, could have called unprepare before rollback)
await tx.rollback(true); // true to release the connection back to the pool
}
throw err;
}
}
// just a utility function to iterate over muliple bind arrays and rename them
function nameAll(name, label, table1BindsArray, table2BindsArray) {
const ln = table1BindsArray.length + (table2BindsArray ? table2BindsArray.length : 0);
for (let i = 0, ti, ri, barr; i < ln; i++) {
// select which table the binds are for
if (i < table1BindsArray.length) {
ti = 0;
ri = i;
barr = table1BindsArray;
} else {
ti = 1;
ri = i - table1BindsArray.length;
barr = table2BindsArray;
}
// update with expanded name
barr[ri][`name${ti ? ti + 1 : ''}`] = `TABLE: ${ti + 1}, ROW: ${ri + 1}, ${name}: "${label} ${i + 1}"`;
}
return [ 'name', 'name2' ];
}
Delete Rows (streaming using the same SQL as the prior delete rows example):
'use strict';
const Stream = require('stream');
// node >= v16 :
// const { pipeline } = require('stream/promises');
// node < 16 :
const Util = require('util');
const pipeline = Util.promisify(Stream.pipeline);
// export just to illustrate module usage
module.exports = async function runExample(manager, connName) {
/** @type {typedefs.SQLERExecResults[]} */
const rtn = new Array(2);
for (let ti = 0; ti < rtn.length; ti++) {
// Delete rows from multiple tables within a single execution
rtn[ti] = await manager.db[connName].delete[`table${ti + 1}`].rows({
// delete binds will be batched in groups of 2 before streaming them to the database since
// execOpts.stream = 2, but we could have batched them individually (stream = 1) as well
// https://oracle.github.io/node-oracledb/doc/api.html#executemany
stream: 2
// no need to set execOpts.binds since they will be streamed from the delete instead
});
for (let writeStream of rtn[ti].rows) {
await pipeline(
// here we're just using some static values for illustration purposes, but they can come from a
// any readable stream source like a file, database, etc. as long as they are "transformed"
// into JSON binds before the sqler writable stream receives them
Stream.Readable.from([
{
[`id${ti ? 2 : ''}`]: 100
},
{
[`id${ti ? 2 : ''}`]: 200
}
]),
writeStream
)
}
}
return rtn;
};