Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const currentConnection = this._currentConnection;
if (!currentConnection || !isCurrent()) {
return;
}
const currentBlockIndex = currentConnection.remainingBlockRange.start;
// Only request topics that we don't already have.
const topics = this._blocks[currentBlockIndex]
? currentConnection.topics.filter(
(topic) => !this._blocks[currentBlockIndex] || !this._blocks[currentBlockIndex].messagesByTopic[topic]
)
: currentConnection.topics;
// Get messages from the underlying provider.
const startTime = TimeUtil.add(this._startTime, fromNanoSec(currentBlockIndex * MEM_CACHE_BLOCK_SIZE_NS));
const endTime = TimeUtil.add(
this._startTime,
fromNanoSec(Math.min(this._totalNs, (currentBlockIndex + 1) * MEM_CACHE_BLOCK_SIZE_NS) - 1) // endTime is inclusive.
);
const messages = topics.length ? await this._provider.getMessages(startTime, endTime, topics) : [];
// If we're not current any more, discard the messages, because otherwise we might write duplicate messages.
if (!isCurrent()) {
return;
}
// Create a new block if necessary.
this._blocks[currentBlockIndex] = this._blocks[currentBlockIndex] || { messagesByTopic: {}, sizeInBytes: 0 };
const currentBlock = this._blocks[currentBlockIndex];
if (!currentBlock) {
throw new Error("currentBlock should be set here");
}
this._setCurrentTime(time);
const seekTime = Date.now();
this._lastSeekTime = seekTime;
this._cancelSeekBackfill = false;
// cancel any queued _emitState that might later emit messages from before we seeked
this._messages = [];
// do not _emitState if subscriptions have changed, but time has not
if (isEqual(this._currentTime, time)) {
this._emitState();
}
if (!this._isPlaying) {
this._getMessages(
TimeUtil.add(clampTime(time, TimeUtil.add(this._start, { sec: 0, nsec: SEEK_BACK_NANOSECONDS }), this._end), {
sec: 0,
nsec: -SEEK_BACK_NANOSECONDS,
}),
time
).then((messages) => {
// Only emit the messages if we haven't seeked again / emitted messages since we
// started loading them. Note that for the latter part just checking for `isPlaying`
// is not enough because the user might have started playback and then paused again!
// Therefore we really need something like `this._cancelSeekBackfill`.
if (this._lastSeekTime === seekTime && !this._cancelSeekBackfill) {
this._messages = messages;
this._emitState();
}
});
}
}
while (true) {
const currentConnection = this._currentConnection;
if (!currentConnection || !isCurrent()) {
return;
}
const currentBlockIndex = currentConnection.remainingBlockRange.start;
// Only request topics that we don't already have.
const topics = this._blocks[currentBlockIndex]
? currentConnection.topics.filter(
(topic) => !this._blocks[currentBlockIndex] || !this._blocks[currentBlockIndex].messagesByTopic[topic]
)
: currentConnection.topics;
// Get messages from the underlying provider.
const startTime = TimeUtil.add(this._startTime, fromNanoSec(currentBlockIndex * MEM_CACHE_BLOCK_SIZE_NS));
const endTime = TimeUtil.add(
this._startTime,
fromNanoSec(Math.min(this._totalNs, (currentBlockIndex + 1) * MEM_CACHE_BLOCK_SIZE_NS) - 1) // endTime is inclusive.
);
const messages = topics.length ? await this._provider.getMessages(startTime, endTime, topics) : [];
// If we're not current any more, discard the messages, because otherwise we might write duplicate messages.
if (!isCurrent()) {
return;
}
// Create a new block if necessary.
this._blocks[currentBlockIndex] = this._blocks[currentBlockIndex] || { messagesByTopic: {}, sizeInBytes: 0 };
const currentBlock = this._blocks[currentBlockIndex];
if (!currentBlock) {
throw new Error("currentBlock should be set here");
async _getMessages(start: Time, end: Time, topics: string[]): Promise {
// if our topics change we need to clear out the cached ranges, or if we're
// reading from before the first range.
if (
intersection(this._topics, topics).length !== topics.length ||
!this._current ||
TimeUtil.isLessThan(start, this._current.start)
) {
this._topics = topics;
this._current = undefined;
this._next = undefined;
}
let messages = [];
const currentMatches = this._current && this._current.overlaps(start, end);
const nextMatches = this._next && this._next.overlaps(start, end);
if (/*:: this._current && */ currentMatches) {
messages = messages.concat(await this._current.getMessages(start, end));
}
if (/*:: this._next && */ nextMatches) {
messages = messages.concat(await this._next.getMessages(start, end));
}
if ((!currentMatches && !nextMatches) || (this._next && TimeUtil.isGreaterThan(end, this._next.end))) {
let startTime = start;
if (this._isPlaying) {
return; // Only run once
}
this._isPlaying = true;
logger.info("AutomatedRunPlayer._run()");
await this._emitState([], this._providerResult.start);
let currentTime = this._providerResult.start;
this._client.start({
bagLengthMs: toMillis(subtractTimes(this._providerResult.end, this._providerResult.start), "round-up"),
});
const nsBagTimePerFrame = Math.round(this._msPerFrame * this._speed * 1000000);
let frameCount = 0;
while (TimeUtil.isLessThan(currentTime, this._providerResult.end)) {
if (this._error) {
return;
}
const end = TimeUtil.add(currentTime, { sec: 0, nsec: nsBagTimePerFrame });
this._client.markTotalFrameStart();
const messages = await this._getMessages(currentTime, end);
this._client.markFrameRenderStart();
await this._emitState(messages, end);
this._client.markTotalFrameEnd();
this._client.markFrameRenderEnd();
await this._client.onFrameFinished(frameCount);
currentTime = TimeUtil.add(end, { sec: 0, nsec: 1 });
frameCount++;
async readFrames(onFrame) {
const bag = await open(this.bagPath);
let frame = {};
function flushFrame() {
if (frame.keyTopic) {
onFrame(frame);
frame = {};
}
}
await bag.readMessages({topics: this.topics}, result => {
// rosbag.js reuses the data buffer for subsequent messages, so we need to make a copy
if (result.message.data) {
result.message.data = Buffer.from(result.message.data);
}
if (result.topic === this.keyTopic) {
flushFrame();
async readFrameByKeyTopic(start, end) {
const bag = await open(this.bagPath);
let frame = {};
async function flushFrame() {
if (frame.keyTopic) {
// This needs to be address, was used to flush on keyTopic message to sync
// await onFrame(frame);
frame = {};
}
}
const options = {
startTime: TimeUtil.fromDate(new Date(start * 1e3)),
endTime: TimeUtil.fromDate(new Date(end * 1e3))
};
if (this.topics) {
extensionPoint.progressCallback({
fullyLoadedFractionRanges: ranges.map(({ start, end }) => ({
start: Math.max(0, start / approximateSize),
end: Math.min(1, end / approximateSize),
})),
});
}
},
});
await remoteReader.open(); // Important that we call this first, because it might throw an error if the file can't be read.
approximateSize = remoteReader.size() * 0.99; // Chop off the last percentage or so for the indexes.
this._bag = new Bag(new BagReader(remoteReader));
await this._bag.open();
} else {
this._bag = await open(bagPath.file);
extensionPoint.progressCallback({ fullyLoadedFractionRanges: [{ start: 0, end: 1 }] });
}
const { startTime, endTime } = this._bag;
const connections = ((Object.values(this._bag.connections): any): Connection[]);
if (!startTime || !endTime || !connections.length) {
// This will abort video generation:
reportError("Invalid bag", "Bag is empty or corrupt.", "user");
return new Promise(() => {}); // Just never finish initializing.
}
this._connectionsByTopic = keyBy(connections, "topic");
return {
start: startTime,
end: endTime,
topics: bagConnectionsToTopics(connections),
}
const {timestamp, message} = msg[msg.length - 1];
// Every frame *MUST* have a pose. The pose can be considered
// the core reference point for other data and usually drives the timing
// of the system.
// Position, decimal degrees
const rotation = quaternionToEuler(message.pose.orientation);
const {position} = message.pose;
xvizBuilder
.pose(this.xvizStream)
.mapOrigin(this.origin.longitude, this.origin.latitude, this.origin.altitude)
.position(position.x, position.y, 0)
.orientation(rotation.roll, rotation.pitch, rotation.yaw)
.timestamp(TimeUtil.toDate(timestamp).getTime() / 1e3);
}
isExpired(currentTime: Time) {
// cannot tell if a marker is expired if we don't have a clock yet
if (!currentTime) {
return false;
}
const lifetime = this.getLifetime();
// we use the receive time (clock) instead of the header stamp
// to match the behavior of rviz
const expiresAt = TimeUtil.add(this.receiveTime, lifetime);
return TimeUtil.isGreaterThan(currentTime, expiresAt);
}