Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
public async runOperation(pattern: Algebra.OrderBy, context: ActionContext)
: Promise {
const outputRaw = await this.mediatorQueryOperation.mediate({ operation: pattern.input, context });
const output = ActorQueryOperation.getSafeBindings(outputRaw);
const options = { window: this.window };
const sparqleeConfig = { ...ActorQueryOperation.getExpressionContext(context) };
let bindingsStream = output.bindingsStream;
for (let expr of pattern.expressions) {
const isAscending = this.isAscending(expr);
expr = this.extractSortExpression(expr);
// Transform the stream by annotating it with the expr result
const evaluator = new AsyncEvaluator(expr, sparqleeConfig);
interface IAnnotatedBinding { bindings: Bindings; result: Term; }
const transform = async (bindings: Bindings, next: any) => {
try {
const result = await evaluator.evaluate(bindings);
transformedStream._push({ bindings, result });
} catch (err) {
if (!isExpressionError(err)) {
bindingsStream.emit('error', err);
public async runOperation(pattern: Algebra.Extend, context: ActionContext)
: Promise {
const { expression, input, variable } = pattern;
const output: IActorQueryOperationOutputBindings = ActorQueryOperation.getSafeBindings(
await this.mediatorQueryOperation.mediate({ operation: input, context }));
const extendKey = termToString(variable);
const config = { ...ActorQueryOperation.getExpressionContext(context) };
const evaluator = new AsyncEvaluator(expression, config);
// Transform the stream by extending each Bindings with the expression result
const transform = async (bindings: Bindings, next: any) => {
try {
const result = await evaluator.evaluate(bindings);
// Extend operation is undefined when the key already exists
// We just override it here.
const extended = bindings.set(extendKey, result);
bindingsStream._push(extended);
} catch (err) {
if (isExpressionError(err)) {
// Errors silently don't actually extend according to the spec
bindingsStream._push(bindings);
// But let's warn anyway
this.logWarn(context, `Expression error for extend operation with bindings '${JSON.stringify(bindings)}'`);
public async runOperation(pattern: Algebra.Group, context: ActionContext)
: Promise {
// Get result stream for the input query
const { input, aggregates } = pattern;
const outputRaw = await this.mediatorQueryOperation.mediate({ operation: input, context });
const output = ActorQueryOperation.getSafeBindings(outputRaw);
// The variables in scope are the variables on which we group, i.e. pattern.variables.
// For 'GROUP BY ?x, ?z', this is [?x, ?z], for 'GROUP by expr(?x) as ?e' this is [?e].
// But also in scope are the variables defined by the aggregations, since GROUP has to handle this.
const variables = pattern.variables
.map(termToString)
.concat(aggregates.map((agg) => termToString(agg.variable)));
const sparqleeConfig = { ...ActorQueryOperation.getExpressionContext(context) };
// Return a new promise that completes when the stream has ended or when
// an error occurs
return new Promise((resolve, reject) => {
const groups = new GroupsState(pattern, sparqleeConfig);
// Phase 2: Collect aggregator results
// We can only return when the binding stream ends, when that happens
// we return the identified groups. Which are nothing more than Bindings
// of the grouping variables merged with the aggregate variables
output.bindingsStream.on('end', () => {
try {
const bindingsStream = new ArrayIterator(groups.collectResults());
const metadata = output.metadata;
resolve({ type: 'bindings', bindingsStream, metadata, variables });
} catch (err) {
public async runOperation(pattern: Algebra.Filter, context: ActionContext)
: Promise {
const outputRaw = await this.mediatorQueryOperation.mediate({ operation: pattern.input, context });
const output = ActorQueryOperation.getSafeBindings(outputRaw);
ActorQueryOperation.validateQueryOutput(output, 'bindings');
const { variables, metadata } = output;
const expressionContext = ActorQueryOperation.getExpressionContext(context);
const config = {
...expressionContext,
exists: this.createExistenceResolver(context),
};
const evaluator = new AsyncEvaluator(pattern.expression, config);
const transform = async (item: Bindings, next: any) => {
try {
const result = await evaluator.evaluateAsEBV(item);
if (result) {
bindingsStream._push(item);
}
} catch (err) {
if (!isExpressionError(err)) {
bindingsStream.emit('error', err);
}
public async runOperation(pattern: Algebra.LeftJoin, context: ActionContext)
: Promise {
const leftRaw = await this.mediatorQueryOperation.mediate({ operation: pattern.left, context });
const left = ActorQueryOperation.getSafeBindings(leftRaw);
const rightRaw = await this.mediatorQueryOperation.mediate({ operation: pattern.right, context });
const right = ActorQueryOperation.getSafeBindings(rightRaw);
const config = { ...ActorQueryOperation.getExpressionContext(context) };
const evaluator = (pattern.expression)
? new AsyncEvaluator(pattern.expression, config)
: null;
const leftJoinInner = (outerItem: Bindings, innerStream: ClonedIterator) => {
const joinedStream = innerStream
.transform({
transform: async (innerItem: Bindings, nextInner: any) => {
const joinedBindings = ActorRdfJoin.join(outerItem, innerItem);
if (!joinedBindings) { nextInner(); return; }
if (!pattern.expression) {
joinedStream._push({ joinedBindings, result: true });
nextInner();
return;
}
try {