Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
// TODO: This will not work for larger streams.
// The full inner stream is kept in memory.
joinedStream.on('end', () => nextLeft());
joinedStream.on('data', async ({ joinedBindings, result }) => {
if (result) {
bindingsStream._push(joinedBindings);
}
});
};
const transform = leftJoinOuter;
const bindingsStream = left.bindingsStream
.transform({ optional: true, transform });
const variables = ActorRdfJoin.joinVariables({ entries: [left, right] });
const metadata = () => Promise.all([left, right].map((entry) => entry.metadata()))
.then((metadatas) => metadatas.reduce((acc, val) => acc * val.totalItems, 1))
.catch(() => Infinity)
.then((totalItems) => ({ totalItems }));
return { type: 'bindings', bindingsStream, metadata, variables };
}
}
transform: async (innerItem: Bindings, nextInner: any) => {
const joinedBindings = ActorRdfJoin.join(outerItem, innerItem);
if (!joinedBindings) { nextInner(); return; }
if (!pattern.expression) {
joinedStream._push({ joinedBindings, result: true });
nextInner();
return;
}
try {
const result = await evaluator.evaluateAsEBV(joinedBindings);
joinedStream._push({ joinedBindings, result });
} catch (err) {
if (!isExpressionError(err)) {
bindingsStream.emit('error', err);
}
}
nextInner();
},
public async getOutput(action: IActionRdfJoin): Promise {
const variables = ActorRdfJoin.overlappingVariables(action);
const join = new HashJoin(
action.entries[0].bindingsStream, action.entries[1].bindingsStream,
(entry) => ActorRdfJoinHash.hash(entry, variables), ActorRdfJoin.join);
return { type: 'bindings', bindingsStream: join, variables: ActorRdfJoin.joinVariables(action) };
}
public async getOutput(action: IActionRdfJoin): Promise {
const variables = ActorRdfJoin.overlappingVariables(action);
const join = new SymmetricHashJoin(
action.entries[0].bindingsStream, action.entries[1].bindingsStream,
(entry) => ActorRdfJoinSymmetricHash.hash(entry, variables), ActorRdfJoin.join);
return { type: 'bindings', bindingsStream: join, variables: ActorRdfJoin.joinVariables(action) };
}
protected async getOutput(action: IActionRdfJoin): Promise {
const join = new NestedLoopJoin(
action.entries[0].bindingsStream, action.entries[1].bindingsStream, ActorRdfJoin.join);
return { type: 'bindings', bindingsStream: join, variables: ActorRdfJoin.joinVariables(action) };
}
public async getOutput(action: IActionRdfJoin): Promise {
const variables = ActorRdfJoin.overlappingVariables(action);
const join = new HashJoin(
action.entries[0].bindingsStream, action.entries[1].bindingsStream,
(entry) => ActorRdfJoinHash.hash(entry, variables), ActorRdfJoin.join);
return { type: 'bindings', bindingsStream: join, variables: ActorRdfJoin.joinVariables(action) };
}
public async getOutput(action: IActionRdfJoin): Promise {
const variables = ActorRdfJoin.overlappingVariables(action);
const join = new SymmetricHashJoin(
action.entries[0].bindingsStream, action.entries[1].bindingsStream,
(entry) => ActorRdfJoinSymmetricHash.hash(entry, variables), ActorRdfJoin.join);
return { type: 'bindings', bindingsStream: join, variables: ActorRdfJoin.joinVariables(action) };
}