Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
function readPipeline (readStream, opts, nosplit = false) {
var beforeOffset = 0
var beforeLimit = 0
var offset = opts.offset || 0
var limit = opts.limit
var filter = massageFilters(opts.filter)
return pump([
readStream,
nosplit ? undefined : split2(),
through2.obj(function (row, enc, cb) {
if (!row || !row.trim()) {
// skip empty
return cb()
}
// offset filter
if (beforeOffset < offset) {
beforeOffset++
return cb()
}
// parse
row = JSON.parse(row)
// timestamp range filter
attachStdoutProcessors(stdout) {
stdout.pipe(split2()).on('data', line => {
this.handleErrors(line)
this.notifyOnWalletUnlockerActivation(line)
this.notifyLightningActivation(line)
// If the sync has already completed then we don't need to do any more log processing.
if (this.is(NEUTRINO_CHAIN_SYNC_COMPLETE)) {
return
}
this.notifyOnSyncComplete(line)
// Listen for things that will move us to waiting state.
if (this.is(NEUTRINO_CHAIN_SYNC_PENDING) || this.is(NEUTRINO_CHAIN_SYNC_IN_PROGRESS)) {
this.notifyOnSyncWaiting(line)
}
// Listen for things that will take us out of the waiting state.
return new Promise((resolve, reject) => {
let jobEnding = false
let jobEnded = false
let latestStats = null
let rstream = fs.createReadStream(logPath)
.pipe(split2())
.on('data', line => {
if (!jobEnding && !jobEnded) {
if (jobEndingRe.test(line)) {
jobEnding = true
} else {
if (jobStatusRe.test(line)) {
latestStats = line
}
}
} else {
if (!jobEnded) {
if (jobEndRe.test(line)) {
jobEnded = true
} else {
if (jobStatusRe.test(line)) {
latestStats = line
return new Promise((resolve, reject) => {
let rs = debugLogFile.createReadStream()
rs
.pipe(split())
.pipe(through({encoding: 'utf8', decodeStrings: false}, (data, _, cb) => {
if (data && data.startsWith(key)) {
return cb(null, data.slice(key.length) + '\n')
}
cb()
}))
.pipe(concat({encoding: 'string'}, resolve))
rs.on('error', reject)
})
}
const createLogFormatter = (configuration: LogFormatterConfigurationType) => {
const stream = split((line) => {
if (!isRoarrLine(line)) {
return configuration.excludeOrphans ? '' : line + '\n';
}
const message = JSON.parse(line);
let formattedMessage = '';
formattedMessage = '[' + new Date(message.time).toISOString() + ']';
if (message.context.logLevel && typeof message.context.logLevel === 'number') {
const logLevelName = getLogLevelName(message.context.logLevel);
const logLevelColorName = logLevelColorMap[logLevelName];
if (!logLevelColorName) {
this.logger = pino(
{
base: null,
safe: true,
level: "trace",
},
stream,
);
this.fileStream = this.getFileStream();
const consoleTransport = this.createPrettyTransport(this.options.levels.console, { colorize: true });
const fileTransport = this.createPrettyTransport(this.options.levels.file, { colorize: false });
pump(stream, split(), consoleTransport, process.stdout);
pump(stream, split(), fileTransport, this.fileStream);
return this;
}
.then(daemonStream =>
daemonStream.pipe(split2(line => JSON.parse(line))).pipe(getDaemonMessage)
);
attachStderrProcessors(stderr) {
stderr.pipe(split2()).on('data', line => {
lndLog.error(line)
if (line.startsWith('panic:')) {
this.lastError = line
}
})
}
const child = childProcess.spawn(command, args, spawnOpts);
let cbErr: Error = null;
if (onToken) {
child.stdout.pipe(split2(split)).on("data", (tok: string) => {
try {
onToken(tok);
} catch (err) {
cbErr = err;
}
});
}
if (onErrToken) {
child.stderr.pipe(split2()).on("data", (tok: string) => {
try {
onErrToken(tok);
} catch (err) {
cbErr = err;
}
});
}
let cancelled = false;
return await ctx.withStopper({
stop: async () => {
logger.debug(`Context aborting, killing ${command}`);
child.kill("SIGKILL");
cancelled = true;
},
work: () =>