Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const streamable = async (model, sql, transform) => {
const conn = await model.sequelize.connectionManager.getConnection({
type: 'SELECT'
})
// a not so fun hack to tie our sequelize types into this raw cursor
const query = conn.query(new QueryStream(sql, undefined, {
batchSize,
types: {
getTypeParser: conn.getTypeParser.bind(conn)
}
}))
const modifier = transform ? through2.obj((obj, _, cb) =>
cb(null, transform(obj))
) : through2.obj()
const end = (err) => {
query.close(() => {
model.sequelize.connectionManager.releaseConnection(conn)
.then(() => null)
.catch(() => null)
})
export const backupTransactionsToJSON = async (snapFileName, query, database) => {
const transactionBackupPath = utils.getFilePath(snapFileName, "rollbackTransactions");
await fs.ensureFile(transactionBackupPath);
const snapshotWriteStream = fs.createWriteStream(transactionBackupPath);
const qs = new QueryStream(query);
try {
const data = await database.db.stream(qs, s => s.pipe(JSONStream.stringify()).pipe(snapshotWriteStream));
logger.info(
`${pluralize(
"transaction",
data.processed,
true,
)} from rollbacked blocks safely exported to file ${snapFileName}`,
);
return data;
} catch (error) {
app.forceExit("Error while exporting data via query stream", error);
}
};
export const exportTable = async (table, options) => {
const snapFileName = utils.getFilePath(table, options.meta.folder);
const gzip = zlib.createGzip();
await fs.ensureFile(snapFileName);
logger.info(
`Starting to export table ${table} to folder ${
options.meta.folder
}, append:${!!options.blocks}, skipCompression: ${options.meta.skipCompression}`,
);
try {
const snapshotWriteStream = fs.createWriteStream(snapFileName, options.blocks ? { flags: "a" } : {});
const encodeStream = msgpack.createEncodeStream({ codec: Codec[table] });
const qs = new QueryStream(options.queries[table]);
const data = await options.database.db.stream(qs, s => {
if (options.meta.skipCompression) {
return s.pipe(encodeStream).pipe(snapshotWriteStream);
}
return s
.pipe(encodeStream)
.pipe(gzip)
.pipe(snapshotWriteStream);
});
logger.info(
`Snapshot: ${table} done. ==> Total rows processed: ${data.processed}, duration: ${data.duration} ms`,
);
return {