Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
[name]: value,
};
}, {});
const C = isContextFn(this.sofa.context)
? await this.sofa.context({ req, res })
: this.sofa.context;
const execution = await subscribe({
schema: this.sofa.schema,
document,
operationName,
variableValues,
contextValue: C,
});
if (isAsyncIterable(execution)) {
// successful
// add execution to clients
this.clients.set(id, {
name,
url,
iterator: execution as any,
});
// success
forAwaitEach(execution, async result => {
await this.sendData({
id,
result,
});
}).then(
return sourcePromise.then(function (resultOrStream) {
return (
// Note: Flow can't refine isAsyncIterable, so explicit casts are used.
isAsyncIterable(resultOrStream) ? mapAsyncIterator(resultOrStream, mapSourceToResponse, reportGraphQLError) : resultOrStream
);
}, reportGraphQLError);
}
);
const result = await execute({
...options,
connection,
connectionManager: this.connectionManager,
event,
lambdaContext,
onOperation,
operation,
pubSub,
// tell execute to register subscriptions
registerSubscriptions: true,
subscriptionManager: this.subscriptionManager,
});
if (!isAsyncIterable(result)) {
// send response to client so it can finish operation in case of query or mutation
if (onOperationComplete) {
onOperationComplete(
connection,
(operation as IdentifiedOperationRequest).operationId,
);
}
const response = formatMessage({
id: (operation as IdentifiedOperationRequest).operationId,
payload: result as ExecutionResult,
type: SERVER_EVENT_TYPES.GQL_DATA,
});
await this.connectionManager.sendToConnection(
connection,
response,
);
if (isAsyncIterable(result) && useLegacyProtocol) {
// if result is async iterator, then it means that subscriptions was registered
// legacy protocol requires that GQL_SUBSCRIBED should be sent back
const response = formatMessage({
id: (operation as IdentifiedOperationRequest).operationId,
payload: {},
type: SERVER_EVENT_TYPES.GQL_SUBSCRIBED,
});
await connectionManager.sendToConnection(connection, response);
return {
body: response,
statusCode: 200,
};
}
if (!isAsyncIterable(result)) {
// send response to client so it can finish operation in case of query or mutation
if (onOperationComplete) {
onOperationComplete(
connection,
(operation as IdentifiedOperationRequest).operationId,
);
}
const response = formatMessage({
id: (operation as IdentifiedOperationRequest).operationId,
payload: result as ExecutionResult,
type: SERVER_EVENT_TYPES.GQL_DATA,
});
await connectionManager.sendToConnection(connection, response);
return {
body: response,
statusCode: 200,
return queryExecutor2.subscribe(payload).then((result) => {
expect(isAsyncIterable(result)).toBe(true)
})
})
subscriberResolver: SubscriberResolver,
opts: RequestOptions,
): Promise | ExecutionResult> {
try {
const subscribeResult = await subscribe(
this._schema,
ast,
this._rootValue,
opts.contextValue,
undefined,
opts.operationName,
this._fieldResolver,
this._subscribeFieldResolver,
);
if (isAsyncIterable(subscribeResult)) {
forAwaitEach(subscribeResult, async (result) => {
const resolvedResult = await subscriberResolver(result);
this._eventEmitter.emit(hash, resolvedResult);
});
const eventAsyncIterator = new EventAsyncIterator(this._eventEmitter, hash);
return eventAsyncIterator.getIterator();
} else {
return subscribeResult as ExecutionResult;
}
} catch (error) {
return Promise.reject(error);
}
}
}
let executor = subscribe
const promiseOrIterable = executor(
schema,
document,
{},
params.context,
params.variables,
params.operationName
)
if (
!isAsyncIterable(promiseOrIterable) &&
promiseOrIterable instanceof Promise
) {
executionIterable = promiseOrIterable
} else if (isAsyncIterable(promiseOrIterable)) {
executionIterable = Promise.resolve(promiseOrIterable)
} else {
throw new Error(
'Invalid `execute` return type! Only Promise or AsyncIterable are valid values!'
)
}
}
return executionIterable.then(ei => ({
executionIterable: isAsyncIterable(ei)
? ei
: createAsyncIterator([ei]),
params
}))
})
.then(({ executionIterable, params }) => {
return promise.then((value: T | AsyncIterator) => {
if ( isAsyncIterable(value) ) {
return value as AsyncIterator;
}
return {
next() {
if (!isResolved) {
isResolved = true;
return Promise.resolve({ value, done: false });
}
return Promise.resolve({ value: undefined, done: true });
},
return() {
isResolved = true;
return executionIterable.then(ei => ({
executionIterable: isAsyncIterable(ei)
? ei
: createAsyncIterator([ei]),
params
}))
})
private async _messageHandler(message: Data, { ws, ...rest }: ServerSocketRequestOptions): Promise {
try {
const { context, subscriptionID, subscription } = JSON.parse(message as string);
const subscribeResult = await this._client.request(subscription, rest, context);
if (isAsyncIterable(subscribeResult)) {
forAwaitEach(subscribeResult, ({ _cacheMetadata, ...otherProps }: MaybeRequestResult) => {
if (ws.readyState === ws.OPEN) {
const result: MaybeRequestResultWithDehydratedCacheMetadata = { ...otherProps };
if (_cacheMetadata) result._cacheMetadata = dehydrateCacheMetadata(_cacheMetadata);
ws.send(JSON.stringify({ result, subscriptionID }));
}
});
}
} catch (error) {
ws.send(error);
}
}