Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
switchMap(subscription$ => {
if (!subscription$) return fromValue({ fetching: false });
return concat([
// Initially set fetching to true
fromValue({ fetching: true }),
pipe(
subscription$,
map(({ stale, data, error, extensions }) => ({
fetching: true,
stale: !!stale,
data,
error,
extensions,
}))
),
// When the source proactively closes, fetching is set to false
fromValue({ fetching: false }),
]);
}),
// The individual partial results are merged into each previous result
switchMap(query$ => {
if (!query$) return fromValue({ fetching: false });
return concat([
// Initially set fetching to true
fromValue({ fetching: true }),
pipe(
query$,
map(({ stale, data, error, extensions }) => ({
fetching: false,
stale: !!stale,
data,
error,
extensions,
}))
),
// When the source proactively closes, fetching is set to false
fromValue({ fetching: false }),
]);
}),
// The individual partial results are merged into each previous result
return ops$ => {
return pipe(
ops$,
map(addOperationContext),
tap(handleOperation),
forward,
map(addOperationResponseContext),
tap(handleOperation)
);
};
};
return ops$ => {
return pipe(
ops$,
tap(handleIncomingQuery),
tap(handleIncomingTeardown),
map(handleIncomingMutation),
forward
);
};
};
if (operation.context.requestPolicy === 'cache-and-network') {
result.fetching = true;
reexecuteOperation(client, operation);
}
return result;
})
);
const forwardedOps$ = pipe(
merge([
pipe(
sharedOps$,
filter(op => !shouldSkip(op) && !isOperationCached(op)),
map(mapTypeNames)
),
pipe(
sharedOps$,
filter(op => shouldSkip(op))
),
]),
map(op => addMetadata(op, { cacheOutcome: 'miss' })),
forward,
tap(response => {
if (
response.operation &&
response.operation.operationName === 'mutation'
) {
handleAfterMutation(response);
} else if (
response.operation &&
return ops$ => {
const sharedOps$ = share(ops$);
const cachedOps$ = pipe(
sharedOps$,
filter(op => !shouldSkip(op) && isOperationCached(op)),
map(operation => {
const cachedResult = resultCache.get(operation.key);
const result: OperationResult = {
...cachedResult,
operation: addMetadata(operation, {
cacheOutcome: cachedResult ? 'hit' : 'miss',
}),
};
if (operation.context.requestPolicy === 'cache-and-network') {
result.fetching = true;
reexecuteOperation(client, operation);
}
return result;
})
);
})
);
const forwardedOps$ = pipe(
merge([
pipe(
sharedOps$,
filter(op => !shouldSkip(op) && !isOperationCached(op)),
map(mapTypeNames)
),
pipe(
sharedOps$,
filter(op => shouldSkip(op))
),
]),
map(op => addMetadata(op, { cacheOutcome: 'miss' })),
forward,
tap(response => {
if (
response.operation &&
response.operation.operationName === 'mutation'
) {
handleAfterMutation(response);
} else if (
response.operation &&
response.operation.operationName === 'query'
) {
handleAfterQuery(response);
}
})
);
return ops$ => {
const sharedOps$ = share(ops$);
const targetOperationTypes = ['query', 'mutation', 'subscription'];
const executedOps$ = pipe(
sharedOps$,
filter(f => targetOperationTypes.includes(f.operationName)),
map(async o => {
try {
const r = await execute(
schema,
o.query,
rootValue,
contextValue,
o.variables,
o.operationName,
fieldResolver,
typeResolver
);
return makeResult(o, r);
} catch (err) {
return makeErrorResult(o, err);
}
}),
const makeQueryResults$ = (
client: Client,
queryProp$: Source
): Source => {
const noopQuery = empty as Source;
let lastResults$: void | Source;
return pipe(
queryProp$,
map(query => {
return query === undefined ? noopQuery : client.executeQuery(query);
}),
filter(x => {
const isDistinct = x !== lastResults$;
if (isDistinct) {
lastResults$ = x;
}
return isDistinct;
}),
switchAll
);
};
executeQuery: jest.fn(() =>
pipe(
interval(400),
map(i => ({ data: i, error: i + 1 }))
)
),