Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from(channelName: string): Observable {
let client = this.getClient();
return new Observable(o => {
client.on('message', (channel, message) => {
if (message === '__done') {
return o.complete();
}
try {
// If this is parseable, do that
// Otherwise, just silently catch the error and move on
message = JSON.parse(message);
} catch (e) {}
o.next(message);
});
client.subscribe(channelName);
return () => client.unsubscribe(channelName);
});
}
from(topic: string, offset?: number): Observable {
let client = this.getClient();
let consumer = new kafka.Consumer(client, [{
topic
}]/* TODO */);
return new Observable(o => {
consumer.on('message', m => {
if (m.value === '__done') {
return o.complete();
}
o.next(m.value)
});
consumer.on('error', err => o.error(err));
return () => consumer.close(() => {});
});
}
to(topic: string, iO: Observable): Subscription {
kit('publishes to a kafka topic', (done) => {
let topic = 'gustavTest-publish';
let consumer = new kafka.Consumer(client, [{
topic
}]);
let obs = new Observable(o => {
setTimeout(() => o.next('hello'), 15);
});
gr.to(topic, obs);
consumer.on('message', (message) => {
expect(message.value).to.equal('hello');
done();
});
});
});
it('publishes to a redis channel', (done) => {
let channel = 'test-1';
let obs = new Observable(o => {
setTimeout(() => o.next('hello'), 15);
});
gr.to(channel, obs);
client.on('message', (channelIn, message) => {
expect(channelIn).to.equal(channel);
expect(message).to.equal('hello');
done();
});
client.subscribe(channel);
});
it('publishes to a channel', (done) => {
let channel = 'test-1';
let obs = new Observable(o => {
setTimeout(() => o.next('hello'), 15);
});
gm.subscribe(channel, (message) => {
expect(message).to.equal('hello');
done();
});
gm.to(channel, obs);
});
});
connProm
.then(ch => {
ch.assertExchange(exchange, 'fanout', {durable: true});
ch.assertQueue(queue, {durable: true});
ch.bindQueue(queue, exchange, '');
ch.consume(queue, msg => {
expect(msg.content.toString()).to.equal('hello');
done();
ch.ack(msg);
}, {noAck: false});
})
.catch(handleErr);
let obs = new Observable(o => {
setTimeout(() => o.next('hello'), 15);
});
gr.to(exchange, obs);
});
});
DirectLine3.prototype.getActivity$ = function () {
var _this = this;
return new rxjs_1.Observable(function (subscriber) {
return _this.activitiesGenerator(subscriber);
})
.concatAll()
.do(function (activity) { return console.log("Activity", activity); });
};
DirectLine3.prototype.activitiesGenerator = function (subscriber, watermark) {
DirectLine.prototype.getActivities = function () {
var _this = this;
return new rxjs_1.Observable(function (subscriber) {
return _this.activitiesGenerator(subscriber);
})
.concatAll()
.do(function (dlm) { return console.log("DL Message", dlm); })
.map(function (dlm) {
if (dlm.channelData) {
var channelData = dlm.channelData;
switch (channelData.type) {
case "message":
return Object.assign({}, channelData, {
id: dlm.id,
conversation: { id: dlm.conversationId },
timestamp: dlm.created,
from: { id: dlm.from },
channelData: null,
});
from(channelName: string): Observable {
this.initChannel(channelName);
return new Observable(o => {
this.channels[channelName].push(item => {
if (item === '__done') {
return o.complete();
}
o.next(item);
});
});
}
to(channelName: string, iO: Observable): Subscription {
from(exchange: string): Observable {
let queue = uuid.v4();
return new Observable(o => {
let conn;
connect(this.config.connString)
.then(c => conn = c && c.createChannel())
.then(ch => {
ch.assertExchange(exchange, 'fanout', {durable: true});
ch.assertQueue(queue, {durable: true});
ch.bindQueue(queue, exchange, '');
ch.consume(queue, msg => {
let msgStr = msg.content.toString();
ch.ack(msg);
if (msgStr === '__done') {
return o.complete();
}
o.next(msgStr);
}, {noAck: false});