Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
it('sub close should stop getting messages', (done) => {
const stan = STAN.connect(cluster, nuid.next(), PORT);
stan.on('connect', () => {
// server needs to support close requests
if (!stan.subCloseRequests || stan.subCloseRequests.length === 0) {
stan.close();
// skipped
done();
return;
}
const subject = nuid.next();
const opts = stan.subscriptionOptions();
opts.setDeliverAllAvailable();
const sub = stan.subscribe(subject, '', opts);
let counter = 0;
sub.on('message', () => {
counter++;
describe('Connect', () => {
const PORT = 9876;
const cluster = 'test-cluster';
const uri = 'nats://localhost:' + PORT;
let server;
const serverDir = path.join(os.tmpdir(), nuid.next());
function startServer(done) {
server = ssc.start_server(PORT, ['--store', 'FILE', '--dir', serverDir], () => {
timers.setTimeout(() => {
done();
}, 250);
});
}
beforeEach((done) => {
startServer(done);
});
// Shutdown our server after we are done
afterEach((done) => {
if (server) {
async function pubsub(t: any, input: any): Promise {
t.plan(1);
let lock = new Lock();
try {
let sc = t.context as SC;
let nc = await connect({url: sc.server.nats, payload: Payload.JSON});
let subj = next();
nc.subscribe(subj, (err, msg) => {
if (err) {
t.fail(err);
}
// in JSON undefined is translated to null
if (input === undefined) {
input = null;
}
//@ts-ignore
t.deepEqual(msg.data, input);
// t.log([input, '===', msg.data]);
lock.unlock();
});
nc.publish(subj, input);
} catch (err) {
function Stan(clusterID, clientID, opts) {
events.EventEmitter.call(this);
if (typeof clusterID !== 'string' || clusterID.length < 1) {
throw new Error(BAD_CLUSTER_ID);
}
if (typeof clientID !== 'string' || clientID.length < 1) {
throw new Error(BAD_CLIENT_ID);
}
this.clusterID = clusterID;
this.clientID = clientID;
this.ackSubject = DEFAULT_ACK_PREFIX + "." + nuid.next(); // publish acks
// these are set by stan
this.pubPrefix = null; // publish prefix appended to subject
this.subRequests = null; // subject for subscription requests
this.unsubRequests = null; // subject for unsubscribe requests
this.subCloseRequests = null; // subject for subscription close requests
this.closeRequests = null; // subject for close requests
this.parseOptions(opts);
this.initState();
this.createConnection();
return this;
}
before((done) => {
let conf = {
authorization: {
SUB: {
subscribe: "bar",
publish: "bar"
},
users: [{
user: 'bar',
password: 'bar',
permission: '$SUB'
}]
}
};
let cf = path.resolve(os.tmpdir(), 'conf-' + next() + '.conf');
console.log(cf);
fs.writeFile(cf, ncu.jsonToYaml(conf), (err) => {
if (err) {
done(err);
} else {
server = nsc.start_server(PORT, ['-c', cf], done);
}
});
});
test('reqrep should fail circular json', async (t) => {
t.plan(1);
let sc = t.context as SC;
let o = {};
//@ts-ignore
o.a = o;
let nc = await connect({url: sc.server.nats, payload: Payload.JSON});
await t.throwsAsync(nc.request(next(), 1000, o), {code: ErrorCode.BAD_JSON});
nc.close();
});
function clusterTest(noRandomize, done) {
const latch = latcher(2, done);
const opts = {
servers: ['nats://localhost:22222', uri, 'nats://localhost:22223']
};
if (noRandomize) {
opts.noRandomize = true;
}
const sca = STAN.connect(cluster, nuid.next(), opts);
const scb = STAN.connect(cluster, nuid.next(), opts);
const subject = nuid.next();
sca.on('connect', () => {
sca.publish(subject, 'bar', (err, guid) => {
should.not.exist(err);
should.exist(guid);
sca.close();
});
});
sca.on('close', latch);
scb.on('connect', () => {
const so = scb.subscriptionOptions();
so.setStartAt(STAN.StartPosition.FIRST);
const sub = scb.subscribe(subject, so);
sub.on('error', (err) => {
test.before(async (t) => {
let conf = {
authorization: {
users: [{
user: 'derek',
password: 'foobar',
permission: {
subscribe: 'bar',
publish: 'foo'
}
}]
}
};
//@ts-ignore
let fp = join(CONF_DIR, next() + '.conf');
writeFile(fp, jsonToNatsConf(conf));
let server = await startServer(['-c', fp]);
t.context = {server: server};
});
let dir = (process.env.TRAVIS) ? process.env.TRAVIS_BUILD_DIR : process.env.TMPDIR;
//@ts-ignore
let operatorJwtPath = path.join(dir, next() + '.jwt');
writeFile(operatorJwtPath, opJWT);
let conf = {
operator: operatorJwtPath,
resolver: 'MEMORY',
resolver_preload: {}
};
//@ts-ignore
conf.resolver_preload[accountPK] = accountJWT;
//@ts-ignore
let confPath = path.join(dir, next() + '.conf');
writeFile(confPath, jsonToNatsConf(conf));
let server = await startServer(['-c', confPath]);
t.context = {server: server, confPath: confPath, opJWT: operatorJwtPath};
});
if (msg) {
const pingResponse = proto.pb.PingResponse.deserializeBinary(Buffer.from(msg, 'binary'));
const err = pingResponse.getError();
if (err) {
this.closeWithError('connection_lost', err);
return;
}
}
this.pingOut = 0;
});
this.ackSubscription = this.nc.subscribe(this.ackSubject, this.processAck());
const discoverSubject = this.options.discoverPrefix + '.' + this.clusterID;
//noinspection JSUnresolvedFunction
this.connId = Buffer.from(nuid.next(), "utf8");
const req = new proto.pb.ConnectRequest();
req.setClientId(this.clientID);
req.setHeartbeatInbox(hbInbox);
req.setProtocol(PROTOCOL_ONE);
req.setConnId(this.connId);
req.setPingInterval(Math.ceil(this.options.stanPingInterval / 1000));
req.setPingMaxOut(this.options.stanMaxPingOut);
this.nc.requestOne(discoverSubject, Buffer.from(req.serializeBinary()), this.options.connectTimeout, (msg) => {
if (msg instanceof nats.NatsError) {
let err = msg;
if (msg.code === nats.REQ_TIMEOUT) {
err = new nats.NatsError(CONNECT_REQ_TIMEOUT, CONNECT_REQ_TIMEOUT, err);
}
this.closeWithError('error', err);