903 lines
28 KiB
TypeScript
903 lines
28 KiB
TypeScript
import { type Readable, Transform, type TransformCallback } from 'stream';
|
|
import { clearTimeout, setTimeout } from 'timers';
|
|
|
|
import {
|
|
type BSONSerializeOptions,
|
|
deserialize,
|
|
type DeserializeOptions,
|
|
type Document,
|
|
type ObjectId
|
|
} from '../bson';
|
|
import { type AutoEncrypter } from '../client-side-encryption/auto_encrypter';
|
|
import {
|
|
CLOSE,
|
|
CLUSTER_TIME_RECEIVED,
|
|
COMMAND_FAILED,
|
|
COMMAND_STARTED,
|
|
COMMAND_SUCCEEDED,
|
|
kDecorateResult,
|
|
PINNED,
|
|
UNPINNED
|
|
} from '../constants';
|
|
import {
|
|
MongoCompatibilityError,
|
|
MONGODB_ERROR_CODES,
|
|
MongoMissingDependencyError,
|
|
MongoNetworkError,
|
|
MongoNetworkTimeoutError,
|
|
MongoOperationTimeoutError,
|
|
MongoParseError,
|
|
MongoServerError,
|
|
MongoUnexpectedServerResponseError
|
|
} from '../error';
|
|
import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client';
|
|
import { type MongoClientAuthProviders } from '../mongo_client_auth_providers';
|
|
import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mongo_logger';
|
|
import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
|
|
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
|
|
import { ServerType } from '../sdam/common';
|
|
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
|
|
import { type TimeoutContext, TimeoutError } from '../timeout';
|
|
import {
|
|
BufferPool,
|
|
calculateDurationInMs,
|
|
type Callback,
|
|
decorateDecryptionResult,
|
|
HostAddress,
|
|
maxWireVersion,
|
|
type MongoDBNamespace,
|
|
now,
|
|
once,
|
|
squashError,
|
|
uuidV4
|
|
} from '../utils';
|
|
import type { WriteConcern } from '../write_concern';
|
|
import type { AuthContext } from './auth/auth_provider';
|
|
import type { MongoCredentials } from './auth/mongo_credentials';
|
|
import {
|
|
CommandFailedEvent,
|
|
CommandStartedEvent,
|
|
CommandSucceededEvent
|
|
} from './command_monitoring_events';
|
|
import {
|
|
OpCompressedRequest,
|
|
OpMsgRequest,
|
|
type OpMsgResponse,
|
|
OpQueryRequest,
|
|
type OpReply,
|
|
type WriteProtocolMessageType
|
|
} from './commands';
|
|
import type { Stream } from './connect';
|
|
import type { ClientMetadata } from './handshake/client_metadata';
|
|
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
|
|
import { type CompressorName, decompressResponse } from './wire_protocol/compression';
|
|
import { onData } from './wire_protocol/on_data';
|
|
import {
|
|
CursorResponse,
|
|
MongoDBResponse,
|
|
type MongoDBResponseConstructor
|
|
} from './wire_protocol/responses';
|
|
import { getReadPreference, isSharded } from './wire_protocol/shared';
|
|
|
|
/** @internal */
|
|
export interface CommandOptions extends BSONSerializeOptions {
|
|
secondaryOk?: boolean;
|
|
/** Specify read preference if command supports it */
|
|
readPreference?: ReadPreferenceLike;
|
|
monitoring?: boolean;
|
|
socketTimeoutMS?: number;
|
|
/** Session to use for the operation */
|
|
session?: ClientSession;
|
|
documentsReturnedIn?: string;
|
|
noResponse?: boolean;
|
|
omitReadPreference?: boolean;
|
|
omitMaxTimeMS?: boolean;
|
|
|
|
// TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint
|
|
// from executeOperation that the txnNum should be applied to this command.
|
|
// Applying a session to a command should happen as part of command construction,
|
|
// most likely in the CommandOperation#executeCommand method, where we have access to
|
|
// the details we need to determine if a txnNum should also be applied.
|
|
willRetryWrite?: boolean;
|
|
|
|
writeConcern?: WriteConcern;
|
|
|
|
directConnection?: boolean;
|
|
|
|
/** @internal */
|
|
timeoutContext?: TimeoutContext;
|
|
}
|
|
|
|
/** @public */
|
|
export interface ProxyOptions {
|
|
proxyHost?: string;
|
|
proxyPort?: number;
|
|
proxyUsername?: string;
|
|
proxyPassword?: string;
|
|
}
|
|
|
|
/** @public */
|
|
export interface ConnectionOptions
|
|
extends SupportedNodeConnectionOptions,
|
|
StreamDescriptionOptions,
|
|
ProxyOptions {
|
|
// Internal creation info
|
|
id: number | '<monitor>';
|
|
generation: number;
|
|
hostAddress: HostAddress;
|
|
/** @internal */
|
|
autoEncrypter?: AutoEncrypter;
|
|
serverApi?: ServerApi;
|
|
monitorCommands: boolean;
|
|
/** @internal */
|
|
connectionType?: any;
|
|
credentials?: MongoCredentials;
|
|
/** @internal */
|
|
authProviders: MongoClientAuthProviders;
|
|
connectTimeoutMS?: number;
|
|
tls: boolean;
|
|
noDelay?: boolean;
|
|
socketTimeoutMS?: number;
|
|
cancellationToken?: CancellationToken;
|
|
metadata: ClientMetadata;
|
|
/** @internal */
|
|
extendedMetadata: Promise<Document>;
|
|
/** @internal */
|
|
mongoLogger?: MongoLogger | undefined;
|
|
}
|
|
|
|
/** @public */
|
|
export type ConnectionEvents = {
|
|
commandStarted(event: CommandStartedEvent): void;
|
|
commandSucceeded(event: CommandSucceededEvent): void;
|
|
commandFailed(event: CommandFailedEvent): void;
|
|
clusterTimeReceived(clusterTime: Document): void;
|
|
close(): void;
|
|
pinned(pinType: string): void;
|
|
unpinned(pinType: string): void;
|
|
};
|
|
|
|
/** @internal */
|
|
export function hasSessionSupport(conn: Connection): boolean {
|
|
const description = conn.description;
|
|
return description.logicalSessionTimeoutMinutes != null;
|
|
}
|
|
|
|
function streamIdentifier(stream: Stream, options: ConnectionOptions): string {
|
|
if (options.proxyHost) {
|
|
// If proxy options are specified, the properties of `stream` itself
|
|
// will not accurately reflect what endpoint this is connected to.
|
|
return options.hostAddress.toString();
|
|
}
|
|
|
|
const { remoteAddress, remotePort } = stream;
|
|
if (typeof remoteAddress === 'string' && typeof remotePort === 'number') {
|
|
return HostAddress.fromHostPort(remoteAddress, remotePort).toString();
|
|
}
|
|
|
|
return uuidV4().toString('hex');
|
|
}
|
|
|
|
/** @internal */
|
|
export class Connection extends TypedEventEmitter<ConnectionEvents> {
|
|
public id: number | '<monitor>';
|
|
public address: string;
|
|
public lastHelloMS = -1;
|
|
public serverApi?: ServerApi;
|
|
public helloOk = false;
|
|
public authContext?: AuthContext;
|
|
public delayedTimeoutId: NodeJS.Timeout | null = null;
|
|
public generation: number;
|
|
public accessToken?: string;
|
|
public readonly description: Readonly<StreamDescription>;
|
|
/**
|
|
* Represents if the connection has been established:
|
|
* - TCP handshake
|
|
* - TLS negotiated
|
|
* - mongodb handshake (saslStart, saslContinue), includes authentication
|
|
*
|
|
* Once connection is established, command logging can log events (if enabled)
|
|
*/
|
|
public established: boolean;
|
|
/** Indicates that the connection (including underlying TCP socket) has been closed. */
|
|
public closed = false;
|
|
|
|
private lastUseTime: number;
|
|
private clusterTime: Document | null = null;
|
|
private error: Error | null = null;
|
|
private dataEvents: AsyncGenerator<Buffer, void, void> | null = null;
|
|
|
|
private readonly socketTimeoutMS: number;
|
|
private readonly monitorCommands: boolean;
|
|
private readonly socket: Stream;
|
|
private readonly messageStream: Readable;
|
|
|
|
/** @event */
|
|
static readonly COMMAND_STARTED = COMMAND_STARTED;
|
|
/** @event */
|
|
static readonly COMMAND_SUCCEEDED = COMMAND_SUCCEEDED;
|
|
/** @event */
|
|
static readonly COMMAND_FAILED = COMMAND_FAILED;
|
|
/** @event */
|
|
static readonly CLUSTER_TIME_RECEIVED = CLUSTER_TIME_RECEIVED;
|
|
/** @event */
|
|
static readonly CLOSE = CLOSE;
|
|
/** @event */
|
|
static readonly PINNED = PINNED;
|
|
/** @event */
|
|
static readonly UNPINNED = UNPINNED;
|
|
|
|
constructor(stream: Stream, options: ConnectionOptions) {
|
|
super();
|
|
|
|
this.socket = stream;
|
|
this.id = options.id;
|
|
this.address = streamIdentifier(stream, options);
|
|
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
|
|
this.monitorCommands = options.monitorCommands;
|
|
this.serverApi = options.serverApi;
|
|
this.mongoLogger = options.mongoLogger;
|
|
this.established = false;
|
|
|
|
this.description = new StreamDescription(this.address, options);
|
|
this.generation = options.generation;
|
|
this.lastUseTime = now();
|
|
|
|
this.messageStream = this.socket
|
|
.on('error', this.onError.bind(this))
|
|
.pipe(new SizedMessageTransform({ connection: this }))
|
|
.on('error', this.onError.bind(this));
|
|
this.socket.on('close', this.onClose.bind(this));
|
|
this.socket.on('timeout', this.onTimeout.bind(this));
|
|
|
|
this.messageStream.pause();
|
|
}
|
|
|
|
public get hello() {
|
|
return this.description.hello;
|
|
}
|
|
|
|
// the `connect` method stores the result of the handshake hello on the connection
|
|
public set hello(response: Document | null) {
|
|
this.description.receiveResponse(response);
|
|
Object.freeze(this.description);
|
|
}
|
|
|
|
public get serviceId(): ObjectId | undefined {
|
|
return this.hello?.serviceId;
|
|
}
|
|
|
|
public get loadBalanced(): boolean {
|
|
return this.description.loadBalanced;
|
|
}
|
|
|
|
public get idleTime(): number {
|
|
return calculateDurationInMs(this.lastUseTime);
|
|
}
|
|
|
|
private get hasSessionSupport(): boolean {
|
|
return this.description.logicalSessionTimeoutMinutes != null;
|
|
}
|
|
|
|
private get supportsOpMsg(): boolean {
|
|
return (
|
|
this.description != null &&
|
|
maxWireVersion(this) >= 6 &&
|
|
!this.description.__nodejs_mock_server__
|
|
);
|
|
}
|
|
|
|
private get shouldEmitAndLogCommand(): boolean {
|
|
return (
|
|
(this.monitorCommands ||
|
|
(this.established &&
|
|
!this.authContext?.reauthenticating &&
|
|
this.mongoLogger?.willLog(MongoLoggableComponent.COMMAND, SeverityLevel.DEBUG))) ??
|
|
false
|
|
);
|
|
}
|
|
|
|
public markAvailable(): void {
|
|
this.lastUseTime = now();
|
|
}
|
|
|
|
public onError(error: Error) {
|
|
this.cleanup(error);
|
|
}
|
|
|
|
private onClose() {
|
|
const message = `connection ${this.id} to ${this.address} closed`;
|
|
this.cleanup(new MongoNetworkError(message));
|
|
}
|
|
|
|
private onTimeout() {
|
|
this.delayedTimeoutId = setTimeout(() => {
|
|
const message = `connection ${this.id} to ${this.address} timed out`;
|
|
const beforeHandshake = this.hello == null;
|
|
this.cleanup(new MongoNetworkTimeoutError(message, { beforeHandshake }));
|
|
}, 1).unref(); // No need for this timer to hold the event loop open
|
|
}
|
|
|
|
public destroy(): void {
|
|
if (this.closed) {
|
|
return;
|
|
}
|
|
|
|
// load balanced mode requires that these listeners remain on the connection
|
|
// after cleanup on timeouts, errors or close so we remove them before calling
|
|
// cleanup.
|
|
this.removeAllListeners(Connection.PINNED);
|
|
this.removeAllListeners(Connection.UNPINNED);
|
|
const message = `connection ${this.id} to ${this.address} closed`;
|
|
this.cleanup(new MongoNetworkError(message));
|
|
}
|
|
|
|
/**
|
|
* A method that cleans up the connection. When `force` is true, this method
|
|
* forcibly destroys the socket.
|
|
*
|
|
* If an error is provided, any in-flight operations will be closed with the error.
|
|
*
|
|
* This method does nothing if the connection is already closed.
|
|
*/
|
|
private cleanup(error: Error): void {
|
|
if (this.closed) {
|
|
return;
|
|
}
|
|
|
|
this.socket.destroy();
|
|
this.error = error;
|
|
|
|
this.dataEvents?.throw(error).then(undefined, squashError);
|
|
this.closed = true;
|
|
this.emit(Connection.CLOSE);
|
|
}
|
|
|
|
private prepareCommand(db: string, command: Document, options: CommandOptions) {
|
|
let cmd = { ...command };
|
|
|
|
const readPreference = getReadPreference(options);
|
|
const session = options?.session;
|
|
|
|
let clusterTime = this.clusterTime;
|
|
|
|
if (this.serverApi) {
|
|
const { version, strict, deprecationErrors } = this.serverApi;
|
|
cmd.apiVersion = version;
|
|
if (strict != null) cmd.apiStrict = strict;
|
|
if (deprecationErrors != null) cmd.apiDeprecationErrors = deprecationErrors;
|
|
}
|
|
|
|
if (this.hasSessionSupport && session) {
|
|
if (
|
|
session.clusterTime &&
|
|
clusterTime &&
|
|
session.clusterTime.clusterTime.greaterThan(clusterTime.clusterTime)
|
|
) {
|
|
clusterTime = session.clusterTime;
|
|
}
|
|
|
|
const sessionError = applySession(session, cmd, options);
|
|
if (sessionError) throw sessionError;
|
|
} else if (session?.explicit) {
|
|
throw new MongoCompatibilityError('Current topology does not support sessions');
|
|
}
|
|
|
|
// if we have a known cluster time, gossip it
|
|
if (clusterTime) {
|
|
cmd.$clusterTime = clusterTime;
|
|
}
|
|
|
|
// For standalone, drivers MUST NOT set $readPreference.
|
|
if (this.description.type !== ServerType.Standalone) {
|
|
if (
|
|
!isSharded(this) &&
|
|
!this.description.loadBalanced &&
|
|
this.supportsOpMsg &&
|
|
options.directConnection === true &&
|
|
readPreference?.mode === 'primary'
|
|
) {
|
|
// For mongos and load balancers with 'primary' mode, drivers MUST NOT set $readPreference.
|
|
// For all other types with a direct connection, if the read preference is 'primary'
|
|
// (driver sets 'primary' as default if no read preference is configured),
|
|
// the $readPreference MUST be set to 'primaryPreferred'
|
|
// to ensure that any server type can handle the request.
|
|
cmd.$readPreference = ReadPreference.primaryPreferred.toJSON();
|
|
} else if (isSharded(this) && !this.supportsOpMsg && readPreference?.mode !== 'primary') {
|
|
// When sending a read operation via OP_QUERY and the $readPreference modifier,
|
|
// the query MUST be provided using the $query modifier.
|
|
cmd = {
|
|
$query: cmd,
|
|
$readPreference: readPreference.toJSON()
|
|
};
|
|
} else if (readPreference?.mode !== 'primary') {
|
|
// For mode 'primary', drivers MUST NOT set $readPreference.
|
|
// For all other read preference modes (i.e. 'secondary', 'primaryPreferred', ...),
|
|
// drivers MUST set $readPreference
|
|
cmd.$readPreference = readPreference.toJSON();
|
|
}
|
|
}
|
|
|
|
const commandOptions = {
|
|
numberToSkip: 0,
|
|
numberToReturn: -1,
|
|
checkKeys: false,
|
|
// This value is not overridable
|
|
secondaryOk: readPreference.secondaryOk(),
|
|
...options
|
|
};
|
|
|
|
options.timeoutContext?.addMaxTimeMSToCommand(cmd, options);
|
|
|
|
const message = this.supportsOpMsg
|
|
? new OpMsgRequest(db, cmd, commandOptions)
|
|
: new OpQueryRequest(db, cmd, commandOptions);
|
|
|
|
return message;
|
|
}
|
|
|
|
private async *sendWire(
|
|
message: WriteProtocolMessageType,
|
|
options: CommandOptions,
|
|
responseType?: MongoDBResponseConstructor
|
|
): AsyncGenerator<MongoDBResponse> {
|
|
this.throwIfAborted();
|
|
|
|
const timeout =
|
|
options.socketTimeoutMS ??
|
|
options?.timeoutContext?.getSocketTimeoutMS() ??
|
|
this.socketTimeoutMS;
|
|
this.socket.setTimeout(timeout);
|
|
|
|
try {
|
|
await this.writeCommand(message, {
|
|
agreedCompressor: this.description.compressor ?? 'none',
|
|
zlibCompressionLevel: this.description.zlibCompressionLevel,
|
|
timeoutContext: options.timeoutContext
|
|
});
|
|
|
|
if (options.noResponse || message.moreToCome) {
|
|
yield MongoDBResponse.empty;
|
|
return;
|
|
}
|
|
|
|
this.throwIfAborted();
|
|
|
|
if (
|
|
options.timeoutContext?.csotEnabled() &&
|
|
options.timeoutContext.minRoundTripTime != null &&
|
|
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
|
|
) {
|
|
throw new MongoOperationTimeoutError(
|
|
'Server roundtrip time is greater than the time remaining'
|
|
);
|
|
}
|
|
|
|
for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
|
|
this.socket.setTimeout(0);
|
|
const bson = response.parse();
|
|
|
|
const document = (responseType ?? MongoDBResponse).make(bson);
|
|
|
|
yield document;
|
|
this.throwIfAborted();
|
|
|
|
this.socket.setTimeout(timeout);
|
|
}
|
|
} finally {
|
|
this.socket.setTimeout(0);
|
|
}
|
|
}
|
|
|
|
private async *sendCommand(
|
|
ns: MongoDBNamespace,
|
|
command: Document,
|
|
options: CommandOptions,
|
|
responseType?: MongoDBResponseConstructor
|
|
) {
|
|
const message = this.prepareCommand(ns.db, command, options);
|
|
let started = 0;
|
|
if (this.shouldEmitAndLogCommand) {
|
|
started = now();
|
|
this.emitAndLogCommand(
|
|
this.monitorCommands,
|
|
Connection.COMMAND_STARTED,
|
|
message.databaseName,
|
|
this.established,
|
|
new CommandStartedEvent(this, message, this.description.serverConnectionId)
|
|
);
|
|
}
|
|
|
|
// If `documentsReturnedIn` not set or raw is not enabled, use input bson options
|
|
// Otherwise, support raw flag. Raw only works for cursors that hardcode firstBatch/nextBatch fields
|
|
const bsonOptions: DeserializeOptions =
|
|
options.documentsReturnedIn == null || !options.raw
|
|
? options
|
|
: {
|
|
...options,
|
|
raw: false,
|
|
fieldsAsRaw: { [options.documentsReturnedIn]: true }
|
|
};
|
|
|
|
/** MongoDBResponse instance or subclass */
|
|
let document: MongoDBResponse | undefined = undefined;
|
|
/** Cached result of a toObject call */
|
|
let object: Document | undefined = undefined;
|
|
try {
|
|
this.throwIfAborted();
|
|
for await (document of this.sendWire(message, options, responseType)) {
|
|
object = undefined;
|
|
if (options.session != null) {
|
|
updateSessionFromResponse(options.session, document);
|
|
}
|
|
|
|
if (document.$clusterTime) {
|
|
this.clusterTime = document.$clusterTime;
|
|
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
|
|
}
|
|
|
|
if (document.ok === 0) {
|
|
if (options.timeoutContext?.csotEnabled() && document.isMaxTimeExpiredError) {
|
|
throw new MongoOperationTimeoutError('Server reported a timeout error', {
|
|
cause: new MongoServerError((object ??= document.toObject(bsonOptions)))
|
|
});
|
|
}
|
|
throw new MongoServerError((object ??= document.toObject(bsonOptions)));
|
|
}
|
|
|
|
if (this.shouldEmitAndLogCommand) {
|
|
this.emitAndLogCommand(
|
|
this.monitorCommands,
|
|
Connection.COMMAND_SUCCEEDED,
|
|
message.databaseName,
|
|
this.established,
|
|
new CommandSucceededEvent(
|
|
this,
|
|
message,
|
|
options.noResponse
|
|
? undefined
|
|
: message.moreToCome
|
|
? { ok: 1 }
|
|
: (object ??= document.toObject(bsonOptions)),
|
|
started,
|
|
this.description.serverConnectionId
|
|
)
|
|
);
|
|
}
|
|
|
|
if (responseType == null) {
|
|
yield (object ??= document.toObject(bsonOptions));
|
|
} else {
|
|
yield document;
|
|
}
|
|
|
|
this.throwIfAborted();
|
|
}
|
|
} catch (error) {
|
|
if (this.shouldEmitAndLogCommand) {
|
|
this.emitAndLogCommand(
|
|
this.monitorCommands,
|
|
Connection.COMMAND_FAILED,
|
|
message.databaseName,
|
|
this.established,
|
|
new CommandFailedEvent(this, message, error, started, this.description.serverConnectionId)
|
|
);
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
public async command<T extends MongoDBResponseConstructor>(
|
|
ns: MongoDBNamespace,
|
|
command: Document,
|
|
options: CommandOptions | undefined,
|
|
responseType: T
|
|
): Promise<InstanceType<T>>;
|
|
|
|
public async command<T extends MongoDBResponseConstructor>(
|
|
ns: MongoDBNamespace,
|
|
command: Document,
|
|
options: CommandOptions | undefined,
|
|
responseType: T | undefined
|
|
): Promise<typeof responseType extends undefined ? Document : InstanceType<T>>;
|
|
|
|
public async command(
|
|
ns: MongoDBNamespace,
|
|
command: Document,
|
|
options?: CommandOptions
|
|
): Promise<Document>;
|
|
|
|
public async command(
|
|
ns: MongoDBNamespace,
|
|
command: Document,
|
|
options: CommandOptions = {},
|
|
responseType?: MongoDBResponseConstructor
|
|
): Promise<Document> {
|
|
this.throwIfAborted();
|
|
for await (const document of this.sendCommand(ns, command, options, responseType)) {
|
|
if (options.timeoutContext?.csotEnabled()) {
|
|
if (MongoDBResponse.is(document)) {
|
|
if (document.isMaxTimeExpiredError) {
|
|
throw new MongoOperationTimeoutError('Server reported a timeout error', {
|
|
cause: new MongoServerError(document.toObject())
|
|
});
|
|
}
|
|
} else {
|
|
if (
|
|
(Array.isArray(document?.writeErrors) &&
|
|
document.writeErrors.some(
|
|
error => error?.code === MONGODB_ERROR_CODES.MaxTimeMSExpired
|
|
)) ||
|
|
document?.writeConcernError?.code === MONGODB_ERROR_CODES.MaxTimeMSExpired
|
|
) {
|
|
throw new MongoOperationTimeoutError('Server reported a timeout error', {
|
|
cause: new MongoServerError(document)
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
return document;
|
|
}
|
|
throw new MongoUnexpectedServerResponseError('Unable to get response from server');
|
|
}
|
|
|
|
public exhaustCommand(
|
|
ns: MongoDBNamespace,
|
|
command: Document,
|
|
options: CommandOptions,
|
|
replyListener: Callback
|
|
) {
|
|
const exhaustLoop = async () => {
|
|
this.throwIfAborted();
|
|
for await (const reply of this.sendCommand(ns, command, options)) {
|
|
replyListener(undefined, reply);
|
|
this.throwIfAborted();
|
|
}
|
|
throw new MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');
|
|
};
|
|
|
|
exhaustLoop().then(undefined, replyListener);
|
|
}
|
|
|
|
private throwIfAborted() {
|
|
if (this.error) throw this.error;
|
|
}
|
|
|
|
/**
|
|
* @internal
|
|
*
|
|
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
|
|
* waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
|
|
*/
|
|
private async writeCommand(
|
|
command: WriteProtocolMessageType,
|
|
options: {
|
|
agreedCompressor?: CompressorName;
|
|
zlibCompressionLevel?: number;
|
|
timeoutContext?: TimeoutContext;
|
|
}
|
|
): Promise<void> {
|
|
const finalCommand =
|
|
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
|
|
? command
|
|
: new OpCompressedRequest(command, {
|
|
agreedCompressor: options.agreedCompressor ?? 'none',
|
|
zlibCompressionLevel: options.zlibCompressionLevel ?? 0
|
|
});
|
|
|
|
const buffer = Buffer.concat(await finalCommand.toBin());
|
|
|
|
if (options.timeoutContext?.csotEnabled()) {
|
|
if (
|
|
options.timeoutContext.minRoundTripTime != null &&
|
|
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
|
|
) {
|
|
throw new MongoOperationTimeoutError(
|
|
'Server roundtrip time is greater than the time remaining'
|
|
);
|
|
}
|
|
}
|
|
|
|
if (this.socket.write(buffer)) return;
|
|
|
|
const drainEvent = once<void>(this.socket, 'drain');
|
|
const timeout = options?.timeoutContext?.timeoutForSocketWrite;
|
|
if (timeout) {
|
|
try {
|
|
return await Promise.race([drainEvent, timeout]);
|
|
} catch (error) {
|
|
let err = error;
|
|
if (TimeoutError.is(error)) {
|
|
err = new MongoOperationTimeoutError('Timed out at socket write');
|
|
this.cleanup(err);
|
|
}
|
|
throw error;
|
|
} finally {
|
|
timeout.clear();
|
|
}
|
|
}
|
|
return await drainEvent;
|
|
}
|
|
|
|
/**
|
|
* @internal
|
|
*
|
|
* Returns an async generator that yields full wire protocol messages from the underlying socket. This function
|
|
* yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
|
|
* by calling `return` on the generator.
|
|
*
|
|
* Note that `for-await` loops call `return` automatically when the loop is exited.
|
|
*/
|
|
private async *readMany(options: {
|
|
timeoutContext?: TimeoutContext;
|
|
}): AsyncGenerator<OpMsgResponse | OpReply> {
|
|
try {
|
|
this.dataEvents = onData(this.messageStream, options);
|
|
this.messageStream.resume();
|
|
|
|
for await (const message of this.dataEvents) {
|
|
const response = await decompressResponse(message);
|
|
yield response;
|
|
|
|
if (!response.moreToCome) {
|
|
return;
|
|
}
|
|
}
|
|
} catch (readError) {
|
|
const err = readError;
|
|
if (TimeoutError.is(readError)) {
|
|
const error = new MongoOperationTimeoutError(
|
|
`Timed out during socket read (${readError.duration}ms)`
|
|
);
|
|
this.dataEvents = null;
|
|
this.onError(error);
|
|
throw error;
|
|
}
|
|
throw err;
|
|
} finally {
|
|
this.dataEvents = null;
|
|
this.messageStream.pause();
|
|
this.throwIfAborted();
|
|
}
|
|
}
|
|
}
|
|
|
|
/** @internal */
|
|
export class SizedMessageTransform extends Transform {
|
|
bufferPool: BufferPool;
|
|
connection: Connection;
|
|
|
|
constructor({ connection }: { connection: Connection }) {
|
|
super({ writableObjectMode: false, readableObjectMode: true });
|
|
this.bufferPool = new BufferPool();
|
|
this.connection = connection;
|
|
}
|
|
|
|
override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void {
|
|
if (this.connection.delayedTimeoutId != null) {
|
|
clearTimeout(this.connection.delayedTimeoutId);
|
|
this.connection.delayedTimeoutId = null;
|
|
}
|
|
|
|
this.bufferPool.append(chunk);
|
|
const sizeOfMessage = this.bufferPool.getInt32();
|
|
|
|
if (sizeOfMessage == null) {
|
|
return callback();
|
|
}
|
|
|
|
if (sizeOfMessage < 0) {
|
|
return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}, too small`));
|
|
}
|
|
|
|
if (sizeOfMessage > this.bufferPool.length) {
|
|
return callback();
|
|
}
|
|
|
|
const message = this.bufferPool.read(sizeOfMessage);
|
|
return callback(null, message);
|
|
}
|
|
}
|
|
|
|
/** @internal */
|
|
export class CryptoConnection extends Connection {
|
|
/** @internal */
|
|
autoEncrypter?: AutoEncrypter;
|
|
|
|
constructor(stream: Stream, options: ConnectionOptions) {
|
|
super(stream, options);
|
|
this.autoEncrypter = options.autoEncrypter;
|
|
}
|
|
|
|
public override async command<T extends MongoDBResponseConstructor>(
|
|
ns: MongoDBNamespace,
|
|
command: Document,
|
|
options: CommandOptions | undefined,
|
|
responseType: T
|
|
): Promise<InstanceType<T>>;
|
|
|
|
public override async command(
|
|
ns: MongoDBNamespace,
|
|
command: Document,
|
|
options?: CommandOptions
|
|
): Promise<Document>;
|
|
|
|
override async command<T extends MongoDBResponseConstructor>(
|
|
ns: MongoDBNamespace,
|
|
cmd: Document,
|
|
options?: CommandOptions,
|
|
responseType?: T | undefined
|
|
): Promise<Document> {
|
|
const { autoEncrypter } = this;
|
|
if (!autoEncrypter) {
|
|
// TODO(NODE-6065): throw a MongoRuntimeError in Node V7
|
|
// @ts-expect-error No cause provided because there is no underlying error.
|
|
throw new MongoMissingDependencyError('No AutoEncrypter available for encryption', {
|
|
dependencyName: 'n/a'
|
|
});
|
|
}
|
|
|
|
const serverWireVersion = maxWireVersion(this);
|
|
if (serverWireVersion === 0) {
|
|
// This means the initial handshake hasn't happened yet
|
|
return await super.command<T>(ns, cmd, options, responseType);
|
|
}
|
|
|
|
if (serverWireVersion < 8) {
|
|
throw new MongoCompatibilityError(
|
|
'Auto-encryption requires a minimum MongoDB version of 4.2'
|
|
);
|
|
}
|
|
|
|
// Save sort or indexKeys based on the command being run
|
|
// the encrypt API serializes our JS objects to BSON to pass to the native code layer
|
|
// and then deserializes the encrypted result, the protocol level components
|
|
// of the command (ex. sort) are then converted to JS objects potentially losing
|
|
// import key order information. These fields are never encrypted so we can save the values
|
|
// from before the encryption and replace them after encryption has been performed
|
|
const sort: Map<string, number> | null = cmd.find || cmd.findAndModify ? cmd.sort : null;
|
|
const indexKeys: Map<string, number>[] | null = cmd.createIndexes
|
|
? cmd.indexes.map((index: { key: Map<string, number> }) => index.key)
|
|
: null;
|
|
|
|
const encrypted = await autoEncrypter.encrypt(ns.toString(), cmd, options);
|
|
|
|
// Replace the saved values
|
|
if (sort != null && (cmd.find || cmd.findAndModify)) {
|
|
encrypted.sort = sort;
|
|
}
|
|
|
|
if (indexKeys != null && cmd.createIndexes) {
|
|
for (const [offset, index] of indexKeys.entries()) {
|
|
// @ts-expect-error `encrypted` is a generic "command", but we've narrowed for only `createIndexes` commands here
|
|
encrypted.indexes[offset].key = index;
|
|
}
|
|
}
|
|
|
|
const encryptedResponse = await super.command(
|
|
ns,
|
|
encrypted,
|
|
options,
|
|
// Eventually we want to require `responseType` which means we would satisfy `T` as the return type.
|
|
// In the meantime, we want encryptedResponse to always be _at least_ a MongoDBResponse if not a more specific subclass
|
|
// So that we can ensure we have access to the on-demand APIs for decorate response
|
|
responseType ?? MongoDBResponse
|
|
);
|
|
|
|
const result = await autoEncrypter.decrypt(encryptedResponse.toBytes(), options);
|
|
|
|
const decryptedResponse = responseType?.make(result) ?? deserialize(result, options);
|
|
|
|
if (autoEncrypter[kDecorateResult]) {
|
|
if (responseType == null) {
|
|
decorateDecryptionResult(decryptedResponse, encryptedResponse.toObject(), true);
|
|
} else if (decryptedResponse instanceof CursorResponse) {
|
|
decryptedResponse.encryptedResponse = encryptedResponse;
|
|
}
|
|
}
|
|
|
|
return decryptedResponse;
|
|
}
|
|
}
|