Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
test('Roundtrip a packet', async t => {
let data = Buffer.alloc(65536);
for ( let x = 0 ; x < data.length ; x++ ) data[x] = x % 128;
let pkt = beamcoder.packet({
pts: 9876543210,
dts: null,
data: data,
flags: { KEY: true },
side_data: { replaygain: Buffer.from('wibble') }
});
let redis = new Redis();
await redis.del(`beam:packet:${pkt.pts}`);
let rdpr = null;
// for ( let x = 0 ; x < 100 ; x++ ) {
let start = process.hrtime();
let pktj = pkt.toJSON();
console.log('toJSON', process.hrtime(start));
start = process.hrtime();
let pktr = packetToRedis(pktj);
console.log('packetToRedis', process.hrtime(start));
let pktr = packetToRedis(pktj);
console.log('packetToRedis', process.hrtime(start));
console.log(pktr);
start = process.hrtime();
t.equal(await redis.hmset(`beam:packet:${pkt.pts}`, pktr), 'OK',
'redis says set OK.');
console.log('Set took', process.hrtime(start));
start = process.hrtime();
let rdp = await redis.hgetallBuffer(`beam:packet:${pkt.pts}`);
console.log('Get took', process.hrtime(start));
start = process.hrtime();
rdpr = packetFromRedis(rdp);
console.log('packetFromRedis', process.hrtime(start));
// }
// t.equal(Buffer.compare(rdp.data, data), 0, 'data has roundtripped OK.');
let rp = beamcoder.packet(rdpr);
t.ok(rp, 'roundtrip packet is truthy.');
console.log(rp);
await redis.quit();
t.end();
});
test('GET a packet', async t => {
try {
t.ok(await flushdb(), 'database flushed OK.');
t.comment('### Retrieve a packet');
t.deepEqual(await redisio.storeMedia('test_url', testUtil.pkt), ['OK-crt','OK-crt'],
'test packet stored OK.');
let response = await request(server).get('/beams/test_url/stream_3/42')
.expect(200);
t.ok(response.ok, 'response claims OK.');
t.equal(response.type, 'application/json', 'response is JSON.');
t.ok(Array.isArray(response.body), 'result is an array.');
let pkt = beamcoder.packet(response.body[0]);
t.ok(pkt, 'roundtrip packet is truthy.');
t.deepEqual(pkt.toJSON(), testUtil.pkt.toJSON(),
'retrieved packet as expected.');
t.equal(pkt.size, 16383, 'has expected size parameter.');
t.comment('### Packet not found');
response = await request(server).get('/beams/test_url/stream_3/41')
.expect(404);
t.notOk(response.ok, 'response is not OK.');
t.equal(response.type, 'application/json', 'response is JSON.');
t.deepEqual(response.body, { statusCode: 404,
error: 'Not Found',
message: `Media with name 'test_url:stream_3:41' was not found: Unable to find requested media elements.` }, // eslint-disable-line
'error message structure as expected.');
t.comment('### Retrieve one packet from a range');
response = await request(server).get('/beams/test_url/stream_3/41')
.expect(404);
t.notOk(response.ok, 'response is not OK.');
t.equal(response.type, 'application/json', 'response is JSON.');
t.deepEqual(response.body, { statusCode: 404,
error: 'Not Found',
message: `Media with name 'test_url:stream_3:41' was not found: Unable to find requested media elements.` }, // eslint-disable-line
'error message structure as expected.');
t.comment('### Retrieve one packet from a range');
response = await request(server).get('/beams/test_url/stream_3/40-45')
.expect(200);
t.ok(response.ok, 'response claims OK.');
t.equal(response.type, 'application/json', 'response is JSON.');
t.ok(Array.isArray(response.body), 'result is an array.');
pkt = beamcoder.packet(response.body[0]);
t.ok(pkt, 'roundtrip packet is truthy.');
t.deepEqual(pkt.toJSON(), testUtil.pkt.toJSON(),
'retrieved packet as expected.');
t.equal(pkt.size, 16383, 'has expected size parameter.');
t.comment('### Store ten packets');
t.ok(await flushdb(), 'database flushed OK.');
for ( let x = 0 ; x < 10 ; x++) {
let tpkt = testUtil.pkt;
tpkt.pts = (x * 10) - 40;
t.deepEqual(await redisio.storeMedia('test_url', tpkt), ['OK-crt','OK-crt'],
`test packet ${tpkt.pts} stored OK.`);
}
t.comment('### Retrieve three by range');
test('Retrieve media', async t => {
t.ok(await beforeTest(), 'test database flushed OK.');
let pkt = beamcoder.packet({
pts: 42,
dts: 43,
data: Buffer.alloc(16383),
stream_index: 3,
flags: { KEY: true, TRUSTED: true},
side_data: { replaygain: Buffer.from('Zen time?') },
duration: 44,
pos: 45
});
let frm = beamcoder.frame({
pts: 43,
width: 1920,
height: 1080,
format: 'yuv422p'
}).alloc();
t.deepEqual(await redisio.storeMedia('test_url', pkt), [ 'OK-crt', 'OK-crt' ],
rpkt = beamcoder.packet(response.body);
t.deepEqual(rpkt.toJSON(), pkt.toJSON(), 'returned packet as expected.');
rpkt = await redisio.retrievePacket('test_url', 0, 42);
t.deepEqual(rpkt.toJSON(), pkt.toJSON(), 'stored packet as expected.');
t.comment('### Put in a packet for the format with "video"');
pkt = testUtil.pkt;
pkt.stream_index = 0;
pkt.duration = 144;
response = await request(server)
.put('/beams/test_url/video/packet_42')
.send(pkt.toJSON())
.expect(200);
t.ok(response.ok, 'response is truthy.');
t.equal(response.type, 'application/json', 'response is JSON.');
rpkt = beamcoder.packet(response.body);
t.deepEqual(rpkt.toJSON(), pkt.toJSON(), 'returned packet as expected.');
rpkt = await redisio.retrievePacket('test_url', 0, 42);
t.deepEqual(rpkt.toJSON(), pkt.toJSON(), 'stored packet as expected.');
t.comment('### Put in a packet that does not identify');
pkt = testUtil.pkt;
pkt.stream_index = 0;
pkt.duration = 145;
response = await request(server)
.put('/beams/test_url/stream_0/42')
.send(pkt.toJSON())
.expect(200);
t.ok(response.ok, 'response is truthy.');
t.equal(response.type, 'application/json', 'response is JSON.');
rpkt = beamcoder.packet(response.body);
t.deepEqual(rpkt.toJSON(), pkt.toJSON(), 'returned packet as expected.');
async function retrievePacketMetadata(fmtOrKey, stream_index = 0, pts = 0) {
let redis = await redisPool.use();
let pktb = fmtOrKey.indexOf('packet') < 0 ?
await getWithStreamAlias(redis, fmtOrKey, stream_index, `packet_${pts}`) :
await redis.hgetallBuffer(fmtOrKey);
redisPool.recycle(redis);
if ((!pktb) || (Object.keys(pktb).length === 0)) {
throw new Error(`Packet in stream '${stream_index}' with timestamp '${pts}' is not found.`);
}
let pkt = beamcoder.packet(mappings.packetFromRedis(pktb));
return pkt;
}
async function retrievePacket(fmtOrKey, stream_index = 0, pts = 0) {
let redis = await Promise.all([ redisPool.use(), redisPool.use() ]);
let dbreq = fmtOrKey.indexOf('packet_') < 0 ?
await Promise.all([
getWithStreamAlias(redis[0], fmtOrKey, stream_index, `packet_${pts}`),
redis[1].getBuffer(`${makeStreamBase(fmtOrKey, stream_index)}:packet_${pts}:data`) ]) :
await Promise.all([
redis[0].hgetallBuffer(fmtOrKey),
redis[1].getBuffer(`${fmtOrKey}:data`) ]);
redisPool.recycle(redis[0]);
redisPool.recycle(redis[1]);
if ((!dbreq[0]) || (Object.keys(dbreq[0]).length === 0)) {
throw new Error(`Packet in stream '${stream_index}' with timestamp '${pts}' not found.`);
}
let pkt = beamcoder.packet(mappings.packetFromRedis(dbreq[0]));
if (dbreq[1]) {
pkt.data = dbreq[1];
}
return pkt;
}
throw Boom.badRequest(`Packet stream index '${ctx.request.body.stream_index}' does not match URL stream index '${streamIdx}'.`);
}
let ptsMatch = extractFinalDigits.exec(ctx.params.mediaSpec);
if (!ptsMatch) {
throw Boom.badRequest('Cannot extact PTS from URL.');
}
if (ctx.request.body.pts !== +ptsMatch[1]) {
throw Boom.badRequest(`URL PTS of '${ptsMatch[1]}' does not match media element PTS of '${ctx.request.body.pts}'.`);
}
try {
if (isPacket) {
ctx.request.body.buf_size = ctx.request.body.size;
delete ctx.request.body.size;
let pkt = beamcoder.packet(ctx.request.body);
let result = await redisio.storePacket(ctx.params.fmtSpec, pkt);
if (!result.every(x => x && x.startsWith('OK'))) {
throw new Error(`Failed to store packet. Result is [${result}].`);
}
ctx.body = pkt;
ctx.status = result[0].endsWith('ovw') ? 200 : 201;
}
if (isFrame) {
let frm = beamcoder.frame(ctx.request.body);
let result = await redisio.storeFrame(ctx.params.fmtSpec, frm, streamIdx);
if (!result.every(x => x && x.startsWith('OK'))) {
throw new Error(`Failed to store frame. Result is [${result}].`);
}
ctx.body = frm;
ctx.status = result[0].endsWith('ovw') ? 200 : 201;
}