Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
// TODO: This will not work for larger streams.
// The full inner stream is kept in memory.
joinedStream.on('end', () => nextLeft());
joinedStream.on('data', async ({ joinedBindings, result }) => {
if (result) {
bindingsStream._push(joinedBindings);
}
});
};
const transform = leftJoinOuter;
const bindingsStream = left.bindingsStream
.transform({ optional: true, transform });
const variables = ActorRdfJoin.joinVariables({ entries: [left, right] });
const metadata = () => Promise.all([left, right].map((entry) => entry.metadata()))
.then((metadatas) => metadatas.reduce((acc, val) => acc * val.totalItems, 1))
.catch(() => Infinity)
.then((totalItems) => ({ totalItems }));
return { type: 'bindings', bindingsStream, metadata, variables };
}
}
public async getOutput(action: IActionRdfJoin): Promise {
const variables = ActorRdfJoin.overlappingVariables(action);
const join = new HashJoin(
action.entries[0].bindingsStream, action.entries[1].bindingsStream,
(entry) => ActorRdfJoinHash.hash(entry, variables), ActorRdfJoin.join);
return { type: 'bindings', bindingsStream: join, variables: ActorRdfJoin.joinVariables(action) };
}
public async getOutput(action: IActionRdfJoin): Promise {
const variables = ActorRdfJoin.overlappingVariables(action);
const join = new SymmetricHashJoin(
action.entries[0].bindingsStream, action.entries[1].bindingsStream,
(entry) => ActorRdfJoinSymmetricHash.hash(entry, variables), ActorRdfJoin.join);
return { type: 'bindings', bindingsStream: join, variables: ActorRdfJoin.joinVariables(action) };
}
protected async getOutput(action: IActionRdfJoin): Promise {
const join = new NestedLoopJoin(
action.entries[0].bindingsStream, action.entries[1].bindingsStream, ActorRdfJoin.join);
return { type: 'bindings', bindingsStream: join, variables: ActorRdfJoin.joinVariables(action) };
}