Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if (err) throw err;
// setup copy from
var command = 'COPY ' + options.table + ' FROM STDIN ';
command = command + '( ';
command = command + 'FORMAT CSV, ';
command = command + "DELIMITER '\t', ";
command = command + "QUOTE '\b', "; // defaults to '"' which can give problems
command = command + 'NULL ' + misval + ' ';
command = command + ') ';
console.log(command.toString());
// create table & sink
client.query('DROP TABLE IF EXISTS ' + options.table);
client.query(q.toString());
var sink = client.query(pgStream.from(command));
// create transfrom
var transform = csvStringify({
columns: columns,
quote: false,
quotedEmpty: false,
delimiter: '\t',
rowDelimiter: 'unix'
});
streamify(parsed).pipe(transform).pipe(sink);
// var testSink = fs.createWriteStream('file_to_import.csv');
// source.pipe(testSink);
});
pg.connect(this.dbConnString, function (err, client, done) {
function doneFn (err) {
done()
self.endHandler(err)
}
if (err) {
doneFn(err)
}
const stream = client.query(copyFrom(self.getCopyQueryText()))
const fileStream = fs.createReadStream(self.deferred.tempDeferredFilename)
fileStream.on('error', doneFn)
fileStream.pipe(stream).on('finish', function () {
// delete temp file
fs.unlink(self.deferred.tempDeferredFilename, doneFn)
})
})
} else {
(finalConnection, finalSql) => {
const copyFromBinaryStream = finalConnection.query(from(finalSql));
bufferToStream(payloadBuffer).pipe(copyFromBinaryStream);
return new Promise((resolve, reject) => {
copyFromBinaryStream.on('error', (error) => {
reject(error);
});
copyFromBinaryStream.on('end', () => {
// $FlowFixMe
resolve({});
});
});
},
);
client.query(c, function (err) {
if (err) throw err;
var stream = client.query(copyFrom("COPY employee FROM STDIN"));
stream.on('end', function () {
done();
helper.pg.end();
});
for (var i = 1; i <= 5; i++) {
var line = ['1\ttest', i, '\tuser', i, '\n'];
stream.write(line.join(''));
}
stream.end();
});
});
async function copySessions(db, input, connection) {
console.time('Copy sessions');
try {
var stream = db.query(copyFrom(`COPY "session" FROM STDIN`));
var promise = new Promise(function(resolve, reject) {
stream.on('error', reject);
stream.on('end', resolve);
});
function write(values) {
return new Promise(function(resolve, reject) {
var ok = stream.write(values, 'utf8');
if (ok) {
return resolve();
}
stream.once('drain', resolve);
});
}
var _start_copy = function(_cb) {
req_start = _now();
var csvstream = csvwriter({
sendHeaders: false,
separator: '\t',
headers: copy.columns
});
var copystream = conn.query(pgcopy(stmt.text));
csvstream.pipe(copystream);
copystream.on('end', _cb);
copystream.on('error', _cb);
for (var row of copy.rows) {
csvstream.write(row);
}
csvstream.end();
}
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[populateTableWorker] Cannot connect to PostgreSQL server...\n' + error, sql);
resolvePopulateTableWorker();
} else {
const sqlCopy = 'COPY "' + self._schema + '"."' + tableName + '" FROM STDIN DELIMITER \'' + self._delimiter + '\' CSV;';
const copyStream = client.query(from(sqlCopy));
const bufferStream = new BufferStream(buffer);
copyStream.on('end', () => {
/*
* COPY FROM STDIN does not return the number of rows inserted.
* But the transactional behavior still applies (no records inserted if at least one failed).
* That is why in case of 'on end' the rowsInChunk value is actually the number of records inserted.
*/
process.send(new MessageToMaster(tableName, rowsInChunk, rowsCnt));
deleteChunk(self, dataPoolId, client, done).then(() => resolvePopulateTableWorker());
});
copyStream.on('error', copyStreamError => {
processDataError(
self,
copyStreamError,
pool.connect((error, client, done) => {
const pgStream = client
.query(copyFrom(query))
.on("end", () => {
done();
return cb();
})
.on("error", error => {
done();
return cb(error);
});
fs.createReadStream(filepath, { encoding: "utf8" })
.pipe(csv.parse())
.pipe(csv.transform(transform))
.pipe(csv.stringify())
.pipe(pgStream);
});
},