Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
public async runOperation(pattern: Algebra.Construct, context: ActionContext)
: Promise {
// Apply a projection on our CONSTRUCT variables first, as the query may contain other variables as well.
const variables: RDF.Variable[] = ActorQueryOperationConstruct.getVariables(pattern.template);
const operation: Algebra.Operation = { type: 'project', input: pattern.input, variables };
// Evaluate the input query
const output: IActorQueryOperationOutputBindings = ActorQueryOperation.getSafeBindings(
await this.mediatorQueryOperation.mediate({ operation, context }));
// construct triples using the result based on the pattern.
const quadStream: AsyncIterator = new BindingsToQuadsIterator(pattern.template, output.bindingsStream);
// Let the final metadata contain the estimated number of triples
let metadata: () => Promise<{[id: string]: any}> = null;
if (output.metadata) {
metadata = () => output.metadata().then((m) => {
if (m) {
if (m.totalItems) {
return Object.assign({}, m, { totalItems: m.totalItems * pattern.template.length });
}
return m;
}
return null;
public async runOperation(pattern: Algebra.Slice, context: ActionContext)
: Promise {
// Resolve the input
const output: IActorQueryOperationOutputBindings = ActorQueryOperation.getSafeBindings(
await this.mediatorQueryOperation.mediate({ operation: pattern.input, context }));
// Slice the bindings stream
const hasLength: boolean = !!pattern.length || pattern.length === 0;
const bindingsStream: BindingsStream = output.bindingsStream.range(pattern.start,
hasLength ? pattern.start + pattern.length - 1 : Infinity);
// If we find metadata, apply slicing on the total number of items
const metadata: () => Promise<{[id: string]: any}> = !output.metadata ? null : () => output.metadata()
.then((subMetadata) => {
let totalItems: number = subMetadata.totalItems;
if (isFinite(totalItems)) {
totalItems = Math.max(0, totalItems - pattern.start);
if (hasLength) {
totalItems = Math.min(totalItems, pattern.length);
}
public async runOperation(pattern: Algebra.Group, context: ActionContext)
: Promise {
// Get result stream for the input query
const { input, aggregates } = pattern;
const outputRaw = await this.mediatorQueryOperation.mediate({ operation: input, context });
const output = ActorQueryOperation.getSafeBindings(outputRaw);
// The variables in scope are the variables on which we group, i.e. pattern.variables.
// For 'GROUP BY ?x, ?z', this is [?x, ?z], for 'GROUP by expr(?x) as ?e' this is [?e].
// But also in scope are the variables defined by the aggregations, since GROUP has to handle this.
const variables = pattern.variables
.map(termToString)
.concat(aggregates.map((agg) => termToString(agg.variable)));
const sparqleeConfig = { ...ActorQueryOperation.getExpressionContext(context) };
// Return a new promise that completes when the stream has ended or when
// an error occurs
return new Promise((resolve, reject) => {
const groups = new GroupsState(pattern, sparqleeConfig);
// Phase 2: Collect aggregator results
public async runOperation(pattern: Algebra.Extend, context: ActionContext)
: Promise {
const { expression, input, variable } = pattern;
const output: IActorQueryOperationOutputBindings = ActorQueryOperation.getSafeBindings(
await this.mediatorQueryOperation.mediate({ operation: input, context }));
const extendKey = termToString(variable);
const config = { ...ActorQueryOperation.getExpressionContext(context) };
const evaluator = new AsyncEvaluator(expression, config);
// Transform the stream by extending each Bindings with the expression result
const transform = async (bindings: Bindings, next: any) => {
try {
const result = await evaluator.evaluate(bindings);
// Extend operation is undefined when the key already exists
// We just override it here.
const extended = bindings.set(extendKey, result);
bindingsStream._push(extended);
} catch (err) {
if (isExpressionError(err)) {
public async runOperation(pattern: Algebra.Project, context: ActionContext)
: Promise {
// Resolve the input
const output: IActorQueryOperationOutputBindings = ActorQueryOperation.getSafeBindings(
await this.mediatorQueryOperation.mediate({ operation: pattern.input, context }));
// Find all variables that should be deleted from the input stream
// and all variables that are not bound in the input stream.
const variables: string[] = pattern.variables.map(termToString);
const deleteVariables = output.variables.filter((variable) => variables.indexOf(variable) < 0);
const missingVariables = variables.filter((variable) => output.variables.indexOf(variable) < 0);
// Make sure the project variables are the only variables that are present in the bindings.
const bindingsStream = !deleteVariables.length && !missingVariables.length
? output.bindingsStream : output.bindingsStream.map(
(binding: Bindings) => {
for (const deleteVariable of deleteVariables) {
binding = binding.delete(deleteVariable);
}
for (const missingVariable of missingVariables) {
public async runOperation(pattern: Algebra.Ask, context: ActionContext)
: Promise {
// Call other query operations like this:
const output: IActorQueryOperationOutput = await this.mediatorQueryOperation.mediate(
{ operation: pattern.input, context });
const bindings: IActorQueryOperationOutputBindings = ActorQueryOperation.getSafeBindings(output);
const booleanResult: Promise = new Promise((resolve, reject) => {
// Resolve to true if we find one element, and close immediately
bindings.bindingsStream.once('data', () => {
resolve(true);
bindings.bindingsStream.close();
});
// If we reach the end of the stream without finding anything, resolve to false
bindings.bindingsStream.on('end', () => resolve(false));
// Reject if an error occurs in the stream
bindings.bindingsStream.on('error', reject);
});
return { type: 'boolean', booleanResult };
}
public async runOperation(pattern: Algebra.Filter, context: ActionContext)
: Promise {
const output: IActorQueryOperationOutputBindings = ActorQueryOperation.getSafeBindings(
await this.mediatorQueryOperation.mediate({ operation: pattern.input, context }));
ActorQueryOperation.validateQueryOutput(output, 'bindings');
const exprFunc = SparqlExpressionEvaluator.createEvaluator(pattern.expression);
const filter = (bindings: Bindings) => {
try {
const term = exprFunc(bindings);
return term && term.value !== 'false' && term.value !== '0';
} catch (e) {
bindingsStream.emit('error', e);
return false;
}
};
const bindingsStream = output.bindingsStream.filter(filter);
return { type: 'bindings', bindingsStream, metadata: output.metadata, variables: output.variables };
throw new Error("Both patterns need to be provided.");
}
const leftInput: IActionQueryOperation = {
context: this.context ? JSON.parse(this.context) : undefined,
operation: JSON.parse(this.leftPattern),
};
const leftOutput = ActorQueryOperation.getSafeBindings(await this.operationMediator.mediate(leftInput));
const rightInput: IActionQueryOperation = {
context: this.context ? JSON.parse(this.context) : undefined,
operation: JSON.parse(this.rightPattern),
};
const rightOutput = ActorQueryOperation.getSafeBindings(await this.operationMediator.mediate(rightInput));
const joinInput: IActionRdfJoin = { entries: [leftOutput, rightOutput] };
const joinOutput = ActorQueryOperation.getSafeBindings(await this.joinMediator.mediate(joinInput));
const readable = new Readable();
readable._read = () => {
return;
};
joinOutput.bindingsStream.on('data', (binding: Bindings) => readable.push(JSON.stringify(binding) + '\n'));
joinOutput.bindingsStream.on('end', () => readable.push(null));
readable.push('Metadata: ' + JSON.stringify(joinOutput.metadata, null, ' ') + '\n');
readable.push('Variables: ' + JSON.stringify(joinOutput.variables, null, ' ') + '\n');
return { stdout: readable };
}
return async (expr, bindings) => {
const operation = this.substitute(expr.input, bindings);
const outputRaw = await this.mediatorQueryOperation.mediate({ operation, context });
const output = ActorQueryOperation.getSafeBindings(outputRaw);
return new Promise(
(resolve, reject) => {
output.bindingsStream.on('end', () => {
resolve(false);
});
output.bindingsStream.on('error', reject);
output.bindingsStream.on('data', () => {
output.bindingsStream.close();
resolve(true);
});
})
.then((exists: boolean) => expr.not ? !exists : exists);
};
public async run(action: IActionInit): Promise {
const operation: string = action.argv.length > 0 ? action.argv[0] : this.operation;
const context: string = action.argv.length > 1 ? action.argv[1] : this.context;
if (!operation) {
throw new Error('An operation must be defined in the config file or passed via the command line.');
}
const resolve: IActionQueryOperation = {
context: context ? this.parseJson(context) : null,
operation: this.parseJson(operation),
};
const result: IActorQueryOperationOutputBindings = ActorQueryOperation.getSafeBindings(
await this.mediatorQueryOperation.mediate(resolve));
result.bindingsStream.on('data', (binding) => readable.push(JSON.stringify(binding) + '\n'));
result.bindingsStream.on('end', () => readable.push(null));
const readable = new Readable();
readable._read = () => {
return;
};
readable.push('Metadata: ' + JSON.stringify(
result.metadata ? await result.metadata() : null, null, ' ') + '\n');
readable.push('Variables: ' + JSON.stringify(result.variables, null, ' ') + '\n');
return { stdout: readable };
}