Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
create: function _CDPB_create(storageBackend) {
let node = createNode({
identifier: "interestDashboardDataProcessorBolt",
listenType: "chartData",
emitType: "interestDashboardData",
_daysPostEpochToDate: function(dayCount) {
return parseInt(dayCount) * 24 * 60 * 60 * 1000;
},
ingest: function _HSB_ingest(message) {
DataProcessorHelper.initChartInStorage("interestDashboardData", this.storage);
/* Processing data for pie chart. */
let interestDashboardTypeNamespace = message["keywords"]["58-cat"];
let chartData = [];
for (let interestData of interestDashboardTypeNamespace.sortedInterests) {
create: function _SDPB_create(storageBackend) {
let node = createNode({
_spiderInput: {"children": {}, "weight": 100},
NUM_NODES_PER_LAYER: 4,
identifier: "spiderDataProcessorBolt",
listenType: "chartData", // Can also listen to other chart data processors
emitType: "spiderData",
_getRadius: function(isToplevel, parentRadius) {
if (!isToplevel) {
return parentRadius / 3.5; // A subcat should be 3.5 times smaller than its parent.
}
// For demo purposes, generate a random number of recommendations between 1-50.
let recommendationCount = Math.floor(Math.random() * 50) + 1;
let radius = 0;
if (recommendationCount <= 10) {
create: function _TDPB_create(storageBackend) {
let node = createNode({
identifier: "areaDataProcessorBolt",
listenType: "chartData", // Can also listen to other chart data processors
emitType: "areaData",
_daysPostEpochToDate: function(dayCount) {
return parseInt(dayCount) * 24 * 60 * 60 * 1000;
},
ingest: function _HSB_ingest(message) {
DataProcessorHelper.initChartInStorage("areaData", this.storage);
let areaTypeNamespace = message["keywords"]["58-cat"];
let chartJSON = [];
let top = areaTypeNamespace.sortedInterests.slice(0, 6);
let topInterests = top.map(interest => {
return interest.name;
create: function _WIDPB_create(storageBackend) {
let node = createNode({
identifier: "weightIntensityDataProcessorBolt",
listenType: "chartData", // Can also listen to other chart data processors
emitType: "weightIntensityData",
ingest: function _HSB_ingest(message) {
DataProcessorHelper.initChartInStorage("weightIntensityData", this.storage);
DataProcessorHelper.iterateOverTypeNamespace(message, this.storage.chartData.weightIntensityData, (message, storedData) => {
// pointToInterestsMap is used to make up for a bug in nvd3 where multiple points can't
// appear in the same location.
let pointToInterestsMap = {};
let values = [];
storedData["xMin"] = message.xMin;
storedData["yMin"] = message.yMin;
storedData["xMax"] = message.xMax;
storedData["yMax"] = message.yMax;
create: function _TDPB_create(storageBackend) {
let node = createNode({
identifier: "timelineDataProcessorBolt",
listenType: "chartData", // Can also listen to other chart data processors
emitType: "timelineData",
ingest: function _HSB_ingest(message) {
DataProcessorHelper.initChartInStorage("timelineData", this.storage);
DataProcessorHelper.iterateOverTypeNamespace(message, this.storage.chartData.timelineData, (message, storedData) => {
let chartJSON = [];
let interestList = Object.keys(message.categories);
for (let i = 0; i < interestList.length; i++) {
let dataPoints = message.categories[interestList[i]].days;
chartJSON.push({
key: interestList[i],
values: Object.keys(dataPoints).map(key => {
dataPoints[key]["y"] = i;
return dataPoints[key];
})
create: function _IIDPB_create(storageBackend) {
let node = createNode({
identifier: "intentInterestDataProcessorBolt",
listenType: "chartData", // Can also listen to other chart data processors
emitType: "intentInterestData",
ingest: function _HSB_ingest(message) {
DataProcessorHelper.initChartInStorage("intentInterestData", this.storage);
DataProcessorHelper.iterateOverTypeNamespace(message, this.storage.chartData.intentInterestData, (message, storedData) => {
storedData["sortedInterests"] = [];
storedData["sortedIntents"] = [];
for (let intentData of message.sortedIntents.splice(0, 10)) {
let maxWeightDate = intentData.maxWeightDate;
let domainList = intentData.days[maxWeightDate]["domainList"];
let maxIntentDate = (new Date(intentData.days[maxWeightDate]["x"])).toLocaleDateString();
let title = intentData.name + " (" + maxIntentDate + ")";
let chartJSON = IntentInterestDataProcessorBolt._createChartData(domainList, storedData, "sortedIntents", title);
}
exports.SocketStream = function(socket) {
socket = util.makeEventTarget(socket);
function maybeCloseSocket() {
if (socket.readyState !== 'closing' && socket.readyState !== 'closed') {
socket.close();
}
}
var out;
this.readable = new streams.ReadableStream({
start: function(c) {
out = c;
socket.addEventListener('data', (evt) => {
c.enqueue(new Uint8Array(evt.data))
});
socket.addEventListener('close', () => {
try {
c.close();
} catch(e) {
// The stream has already been closed.
}
});
socket.addEventListener('error', (evt) => c.error(evt.data || evt));
},
cancel: function() {
maybeCloseSocket();
return function messageChunkedPartStream({
pimap, folderInfo, uid, parts, downloadChunkSize, saveChunkSize }) {
// Pull the parts off as we go.
let remainingPartsToFetch = parts.slice();
// A pull stream, where each pull() corresponds to fetching a single part and
// the generator will enqueue once for each blob and once to close out the
// part.
return new ReadableStream({
start() {
},
pull: co.wrap(function*(out) {
if (!remainingPartsToFetch.length) {
out.close();
return;
}
let blobIndex = 0;
let partInfo = remainingPartsToFetch.shift();
let mimeStream = chunkedDownloadMimeStream({
pimap,
folderInfo,
uid,
partInfo,
},
transform(change, enqueue, done) {
if (isDeletion(change)) {
enqueue({ change, gather: null });
} else {
// (avoid gathering data for already-removed items)
if (queuedSet.has(change.id)) {
logic(ctx, 'gathering', { id: change.id });
let gatherInto = inputToGatherInto(change);
enqueue({ change, gather: rootGatherer.gather(gatherInto) });
}
}
done();
},
writableStrategy: new CountQueuingStrategy({ highWaterMark: 1 }),
readableStrategy: new CountQueuingStrategy({ highWaterMark: 1 })
});
let filterStream = new TransformStream({
flush(enqueue, close) {
close();
},
transform({ change, gather }, enqueue, done) {
if (!gather) {
// This is a deletion. And we care about it or we wouldn't be here.
enqueue(change);
done();
} else {
logic(ctx, 'gatherWait', { id: change.id });
gather.then((gathered) => {
logic(ctx, 'gathered', { id: change.id });
// It's possible the item got removed after we kicked off the gather.
//bufferingStream.readable.pipeTo(gatherStream.writable);
gatherStream.readable.pipeThrough(filterStream).pipeTo(new WritableStream({
start() {
},
write(change) {
onFilteredUpdate(change);
},
close() {
// I don't think anything actually cares? Unless we should be propagating
// through to the moot callback?
},
abort(ex) {
logic(ctx, 'filteringStreamAbortError', { ex, stack: ex.stack });
}
}, new CountQueuingStrategy({ highWaterMark: 1 })));
return {
/**
* This is how we are fed data/changes from the database.
*/
consider: (change) => {
if (!isDeletion(change)) {
// - add/change, process for filtering
queuedSet.add(change.id);
gatherStream.writable.write(change);
} else {
// - removal
// We don't need to check if the value's in here, performing the
// deletion is sufficient for us to ensure that if it's in the pipeline
// that it does not get reported.
queuedSet.delete(change.id);