diff --git a/lib/internal/blob.js b/lib/internal/blob.js index e42773a8d03dcc..e1b1dceabd629d 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -439,63 +439,76 @@ function createBlobReaderStream(reader) { // There really should only be one read at a time so using an // array here is purely defensive. this.pendingPulls = []; + // Register a wakeup callback that the C++ side can invoke + // when new data is available after a STATUS_BLOCK. + reader.setWakeup(() => { + if (this.pendingPulls.length > 0) { + this.readNext(c); + } + }); }, pull(c) { const { promise, resolve, reject } = PromiseWithResolvers(); this.pendingPulls.push({ resolve, reject }); - const readNext = () => { - reader.pull((status, buffer) => { - // If pendingPulls is empty here, the stream had to have - // been canceled, and we don't really care about the result. - // We can simply exit. - if (this.pendingPulls.length === 0) { - return; - } - if (status === 0) { - // EOS - c.close(); - // This is to signal the end for byob readers - // see https://streams.spec.whatwg.org/#example-rbs-pull - c.byobRequest?.respond(0); - const pending = this.pendingPulls.shift(); - pending.resolve(); - return; - } else if (status < 0) { - // The read could fail for many different reasons when reading - // from a non-memory resident blob part (e.g. file-backed blob). - // The error details the system error code. - const error = lazyDOMException('The blob could not be read', 'NotReadableError'); - const pending = this.pendingPulls.shift(); - c.error(error); - pending.reject(error); + this.readNext(c); + return promise; + }, + readNext(c) { + reader.pull((status, buffer) => { + // If pendingPulls is empty here, the stream had to have + // been canceled, and we don't really care about the result. + // We can simply exit. + if (this.pendingPulls.length === 0) { + return; + } + if (status === 0) { + // EOS + c.close(); + // This is to signal the end for byob readers + // see https://streams.spec.whatwg.org/#example-rbs-pull + c.byobRequest?.respond(0); + const pending = this.pendingPulls.shift(); + pending.resolve(); + return; + } else if (status < 0) { + // The read could fail for many different reasons when reading + // from a non-memory resident blob part (e.g. file-backed blob). + // The error details the system error code. + const error = + lazyDOMException('The blob could not be read', + 'NotReadableError'); + const pending = this.pendingPulls.shift(); + c.error(error); + pending.reject(error); + return; + } else if (status === 2) { + // STATUS_BLOCK: No data available yet. The wakeup callback + // registered in start() will re-invoke readNext when data + // arrives. + return; + } + // ReadableByteStreamController.enqueue errors if we submit a + // 0-length buffer. We need to check for that here. + if (buffer !== undefined && buffer.byteLength !== 0) { + c.enqueue(new Uint8Array(buffer)); + } + // We keep reading until we either reach EOS, some error, or + // we hit the flow rate of the stream (c.desiredSize). + // We use setImmediate here because we have to allow the event + // loop to turn in order to process any pending i/o. Using + // queueMicrotask won't allow the event loop to turn. + setImmediate(() => { + if (c.desiredSize < 0) { + // A manual backpressure check. + if (this.pendingPulls.length !== 0) { + const pending = this.pendingPulls.shift(); + pending.resolve(); + } return; } - // ReadableByteStreamController.enqueue errors if we submit a 0-length - // buffer. We need to check for that here. - if (buffer !== undefined && buffer.byteLength !== 0) { - c.enqueue(new Uint8Array(buffer)); - } - // We keep reading until we either reach EOS, some error, or we - // hit the flow rate of the stream (c.desiredSize). - // We use set immediate here because we have to allow the event - // loop to turn in order to process any pending i/o. Using - // queueMicrotask won't allow the event loop to turn. - setImmediate(() => { - if (c.desiredSize < 0) { - // A manual backpressure check. - if (this.pendingPulls.length !== 0) { - // A case of waiting pull finished (= not yet canceled) - const pending = this.pendingPulls.shift(); - pending.resolve(); - } - return; - } - readNext(); - }); + this.readNext(c); }); - }; - readNext(); - return promise; + }); }, cancel(reason) { // Reject any currently pending pulls here. diff --git a/lib/internal/quic/quic.js b/lib/internal/quic/quic.js index 026c0784f0bbe8..39afeb42781af7 100644 --- a/lib/internal/quic/quic.js +++ b/lib/internal/quic/quic.js @@ -480,6 +480,18 @@ setCallbacks({ this[kOwner][kSessionTicket](ticket); }, + /** + * Called when the client receives a NEW_TOKEN frame from the server. + * The token can be used for future connections to the same server + * address to skip address validation. + * @param {Buffer} token The opaque token data + * @param {SocketAddress} address The remote server address + */ + onSessionNewToken(token, address) { + debug('session new token callback', this[kOwner]); + // TODO(@jasnell): Emit to JS for storage and future reconnection use + }, + /** * Called when the session receives a session version negotiation request * @param {*} version diff --git a/lib/internal/quic/state.js b/lib/internal/quic/state.js index 052749945a21df..2c768efa881658 100644 --- a/lib/internal/quic/state.js +++ b/lib/internal/quic/state.js @@ -77,7 +77,6 @@ const { IDX_STATE_STREAM_FIN_RECEIVED, IDX_STATE_STREAM_READ_ENDED, IDX_STATE_STREAM_WRITE_ENDED, - IDX_STATE_STREAM_PAUSED, IDX_STATE_STREAM_RESET, IDX_STATE_STREAM_HAS_OUTBOUND, IDX_STATE_STREAM_HAS_READER, @@ -113,7 +112,6 @@ assert(IDX_STATE_STREAM_FIN_SENT !== undefined); assert(IDX_STATE_STREAM_FIN_RECEIVED !== undefined); assert(IDX_STATE_STREAM_READ_ENDED !== undefined); assert(IDX_STATE_STREAM_WRITE_ENDED !== undefined); -assert(IDX_STATE_STREAM_PAUSED !== undefined); assert(IDX_STATE_STREAM_RESET !== undefined); assert(IDX_STATE_STREAM_HAS_OUTBOUND !== undefined); assert(IDX_STATE_STREAM_HAS_READER !== undefined); @@ -475,11 +473,6 @@ class QuicStreamState { return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_WRITE_ENDED); } - /** @type {boolean} */ - get paused() { - if (this.#handle.byteLength === 0) return undefined; - return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_PAUSED); - } /** @type {boolean} */ get reset() { @@ -561,7 +554,6 @@ class QuicStreamState { finReceived: this.finReceived, readEnded: this.readEnded, writeEnded: this.writeEnded, - paused: this.paused, reset: this.reset, hasOutbound: this.hasOutbound, hasReader: this.hasReader, @@ -590,7 +582,6 @@ class QuicStreamState { finReceived: this.finReceived, readEnded: this.readEnded, writeEnded: this.writeEnded, - paused: this.paused, reset: this.reset, hasOutbound: this.hasOutbound, hasReader: this.hasReader, diff --git a/src/node_blob.cc b/src/node_blob.cc index 417cd8cbd307b9..cb2706eed92dcf 100644 --- a/src/node_blob.cc +++ b/src/node_blob.cc @@ -321,6 +321,7 @@ Local Blob::Reader::GetConstructorTemplate(Environment* env) { BaseObject::kInternalFieldCount); tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader")); SetProtoMethod(env->isolate(), tmpl, "pull", Pull); + SetProtoMethod(env->isolate(), tmpl, "setWakeup", SetWakeup); env->set_blob_reader_constructor_template(tmpl); } return tmpl; @@ -411,6 +412,21 @@ void Blob::Reader::Pull(const FunctionCallbackInfo& args) { std::move(next), node::bob::OPTIONS_END, nullptr, 0)); } +void Blob::Reader::SetWakeup( + const FunctionCallbackInfo& args) { + Blob::Reader* reader; + ASSIGN_OR_RETURN_UNWRAP(&reader, args.This()); + CHECK(args[0]->IsFunction()); + reader->wakeup_.Reset(args.GetIsolate(), args[0].As()); +} + +void Blob::Reader::NotifyPull() { + if (wakeup_.IsEmpty() || !env()->can_call_into_js()) return; + HandleScope handle_scope(env()->isolate()); + Local fn = wakeup_.Get(env()->isolate()); + MakeCallback(fn, 0, nullptr); +} + BaseObjectPtr Blob::BlobTransferData::Deserialize( Environment* env, @@ -591,6 +607,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Blob::GetDataObject); registry->Register(Blob::RevokeObjectURL); registry->Register(Blob::Reader::Pull); + registry->Register(Blob::Reader::SetWakeup); registry->Register(Concat); registry->Register(BlobFromFilePath); } diff --git a/src/node_blob.h b/src/node_blob.h index c601015d9af47b..88a56c7ec9a453 100644 --- a/src/node_blob.h +++ b/src/node_blob.h @@ -82,6 +82,8 @@ class Blob : public BaseObject { static BaseObjectPtr Create(Environment* env, BaseObjectPtr blob); static void Pull(const v8::FunctionCallbackInfo& args); + static void SetWakeup(const v8::FunctionCallbackInfo& args); + void NotifyPull(); explicit Reader(Environment* env, v8::Local obj, @@ -95,6 +97,7 @@ class Blob : public BaseObject { std::shared_ptr inner_; BaseObjectPtr strong_ptr_; bool eos_ = false; + v8::Global wakeup_; }; BaseObject::TransferMode GetTransferMode() const override; diff --git a/src/quic/application.cc b/src/quic/application.cc index 36f11fb9131464..d4daeccc14fd1c 100644 --- a/src/quic/application.cc +++ b/src/quic/application.cc @@ -452,6 +452,7 @@ ssize_t Session::Application::WriteVStream(PathStorage* path, if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; return ngtcp2_conn_writev_stream(*session_, &path->path, + // TODO(@jasnell): ECN blocked on libuv nullptr, dest, max_packet_size, @@ -511,10 +512,13 @@ class DefaultApplication final : public Session::Application { stream_data->count = 0; stream_data->fin = 0; stream_data->stream.reset(); - stream_data->remaining = 0; Debug(&session(), "Default application getting stream data"); DCHECK_NOT_NULL(stream_data); // If the queue is empty, there aren't any streams with data yet + + // If the connection-level flow control window is exhausted, + // there is no point in pulling stream data. + if (!session().max_data_left()) return 0; if (stream_queue_.IsEmpty()) return 0; const auto get_length = [](auto vec, size_t count) { @@ -554,9 +558,7 @@ class DefaultApplication final : public Session::Application { if (count > 0) { stream->Schedule(&stream_queue_); - stream_data->remaining = get_length(data, count); } else { - stream_data->remaining = 0; } // Not calling done here because we defer committing @@ -581,15 +583,6 @@ class DefaultApplication final : public Session::Application { void ResumeStream(int64_t id) override { ScheduleStream(id); } - bool ShouldSetFin(const StreamData& stream_data) override { - auto const is_empty = [](const ngtcp2_vec* vec, size_t cnt) { - size_t i = 0; - for (size_t n = 0; n < cnt; n++) i += vec[n].len; - return i > 0; - }; - - return stream_data.stream && is_empty(stream_data, stream_data.count); - } void BlockStream(int64_t id) override { if (auto stream = session().FindStream(id)) [[likely]] { @@ -598,10 +591,9 @@ class DefaultApplication final : public Session::Application { } bool StreamCommit(StreamData* stream_data, size_t datalen) override { - if (datalen == 0) return true; DCHECK_NOT_NULL(stream_data); CHECK(stream_data->stream); - stream_data->stream->Commit(datalen); + stream_data->stream->Commit(datalen, stream_data->fin); return true; } @@ -616,11 +608,6 @@ class DefaultApplication final : public Session::Application { } } - void UnscheduleStream(int64_t id) { - if (auto stream = session().FindStream(id)) [[likely]] { - stream->Unschedule(); - } - } Stream::Queue stream_queue_; }; diff --git a/src/quic/application.h b/src/quic/application.h index b1f1131e1a7242..52e5f314518c60 100644 --- a/src/quic/application.h +++ b/src/quic/application.h @@ -120,7 +120,6 @@ class Session::Application : public MemoryRetainer { virtual int GetStreamData(StreamData* data) = 0; virtual bool StreamCommit(StreamData* data, size_t datalen) = 0; - virtual bool ShouldSetFin(const StreamData& data) = 0; inline Environment* env() const { return session().env(); } inline Session& session() { @@ -148,7 +147,6 @@ class Session::Application : public MemoryRetainer { struct Session::Application::StreamData final { // The actual number of vectors in the struct, up to kMaxVectorCount. size_t count = 0; - size_t remaining = 0; // The stream identifier. If this is a negative value then no stream is // identified. int64_t id = -1; @@ -156,6 +154,11 @@ struct Session::Application::StreamData final { ngtcp2_vec data[kMaxVectorCount]{}; BaseObjectPtr stream; + static_assert(sizeof(ngtcp2_vec) == sizeof(nghttp3_vec) && + alignof(ngtcp2_vec) == alignof(nghttp3_vec) && + offsetof(ngtcp2_vec, base) == offsetof(nghttp3_vec, base) && + offsetof(ngtcp2_vec, len) == offsetof(nghttp3_vec, len), + "ngtcp2_vec and nghttp3_vec must have identical layout"); inline operator nghttp3_vec*() { return reinterpret_cast(data); } diff --git a/src/quic/bindingdata.h b/src/quic/bindingdata.h index 1b29a54a8c1199..9e778eee408ec3 100644 --- a/src/quic/bindingdata.h +++ b/src/quic/bindingdata.h @@ -43,6 +43,7 @@ class Packet; V(session_datagram_status, SessionDatagramStatus) \ V(session_handshake, SessionHandshake) \ V(session_new, SessionNew) \ + V(session_new_token, SessionNewToken) \ V(session_path_validation, SessionPathValidation) \ V(session_ticket, SessionTicket) \ V(session_version_negotiation, SessionVersionNegotiation) \ @@ -70,6 +71,7 @@ class Packet; V(cubic, "cubic") \ V(disable_stateless_reset, "disableStatelessReset") \ V(enable_connect_protocol, "enableConnectProtocol") \ + V(enable_early_data, "enableEarlyData") \ V(enable_datagrams, "enableDatagrams") \ V(enable_tls_trace, "tlsTrace") \ V(endpoint, "Endpoint") \ @@ -121,6 +123,7 @@ class Packet; V(stream, "Stream") \ V(success, "success") \ V(tls_options, "tls") \ + V(token, "token") \ V(token_expiration, "tokenExpiration") \ V(token_secret, "tokenSecret") \ V(transport_params, "transportParams") \ diff --git a/src/quic/cid.cc b/src/quic/cid.cc index 7255bf2f39adf3..16db80485f108b 100644 --- a/src/quic/cid.cc +++ b/src/quic/cid.cc @@ -85,10 +85,14 @@ const CID CID::kInvalid{}; // CID::Hash size_t CID::Hash::operator()(const CID& cid) const { + // Uses the Boost hash_combine strategy: XOR each byte with the golden + // ratio constant 0x9e3779b9 (derived from the fractional part of the + // golden ratio, (sqrt(5)-1)/2 * 2^32) plus bit-shifted accumulator + // state. This provides good avalanche properties for short byte + // sequences like connection IDs (1-20 bytes). size_t hash = 0; for (size_t n = 0; n < cid.length(); n++) { - hash ^= std::hash{}(cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) + - (hash >> 2)); + hash ^= cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) + (hash >> 2); } return hash; } diff --git a/src/quic/data.cc b/src/quic/data.cc index 99a2b5af3c4f6a..f43ae4ce6edbc4 100644 --- a/src/quic/data.cc +++ b/src/quic/data.cc @@ -28,7 +28,7 @@ using v8::Undefined; using v8::Value; namespace quic { -int DebugIndentScope::indent_ = 0; +thread_local int DebugIndentScope::indent_ = 0; Path::Path(const SocketAddress& local, const SocketAddress& remote) { ngtcp2_addr_init(&this->local, local.data(), local.length()); diff --git a/src/quic/defs.h b/src/quic/defs.h index 74505d8e401f8d..b26ca5f9a4f12e 100644 --- a/src/quic/defs.h +++ b/src/quic/defs.h @@ -338,7 +338,7 @@ class DebugIndentScope final { } private: - static int indent_; + static thread_local int indent_; }; } // namespace node::quic diff --git a/src/quic/endpoint.cc b/src/quic/endpoint.cc index f7baac6fa8e38a..ff877a5d3ba2dc 100644 --- a/src/quic/endpoint.cc +++ b/src/quic/endpoint.cc @@ -319,23 +319,33 @@ class Endpoint::UDP::Impl final : public HandleWrap { const uv_buf_t* buf, const sockaddr* addr, unsigned int flags) { - // Nothing to do in these cases. Specifically, if the nread - // is zero or we've received a partial packet, we're just - // going to ignore it. - if (nread == 0 || flags & UV_UDP_PARTIAL) return; - auto impl = From(handle); DCHECK_NOT_NULL(impl); DCHECK_NOT_NULL(impl->endpoint_); + auto release_buf = [&]() { + if (buf->base != nullptr) + impl->env()->release_managed_buffer(*buf); + }; + + // Nothing to do in these cases. Specifically, if the nread + // is zero or we have received a partial packet, we are just + // going to ignore it. + if (nread == 0 || flags & UV_UDP_PARTIAL) { + release_buf(); + return; + } + if (nread < 0) { + release_buf(); impl->endpoint_->Destroy(CloseContext::RECEIVE_FAILURE, static_cast(nread)); return; } - impl->endpoint_->Receive(uv_buf_init(buf->base, static_cast(nread)), - SocketAddress(addr)); + impl->endpoint_->Receive( + uv_buf_init(buf->base, static_cast(nread)), + SocketAddress(addr)); } uv_udp_t handle_; @@ -739,6 +749,7 @@ void Endpoint::Send(const BaseObjectPtr& packet) { Debug(this, "Sending packet failed with error %d", err); packet->Done(err); Destroy(CloseContext::SEND_FAILURE, err); + return; } STAT_INCREMENT_N(Stats, bytes_sent, packet->length()); STAT_INCREMENT(Stats, packets_sent); @@ -988,7 +999,6 @@ void Endpoint::Destroy(CloseContext context, int status) { this, "Destroying endpoint due to \"%s\" with status %d", ctx, status); } - STAT_RECORD_TIMESTAMP(Stats, destroyed_at); state_->listening = 0; @@ -1007,6 +1017,7 @@ void Endpoint::Destroy(CloseContext context, int status) { DCHECK(sessions_.empty()); token_map_.clear(); dcid_to_scid_.clear(); + server_state_.reset(); udp_.Close(); state_->closing = 0; @@ -1345,29 +1356,33 @@ void Endpoint::Receive(const uv_buf_t& buf, } break; case NGTCP2_PKT_0RTT: + // 0-RTT packets are inherently replayable and could be sent + // from a spoofed source address to trigger amplification. + // When address validation is enabled, we send a Retry to + // force the client to prove it can receive at its claimed + // address. This adds a round trip but prevents amplification + // attacks. When address validation is disabled (e.g., on + // trusted networks), we skip the Retry and allow 0-RTT to + // proceed without additional validation. + if (options_.validate_address) { + Debug(this, + "Sending retry to %s due to 0RTT packet", + remote_address); + SendRetry(PathDescriptor{ + version, + dcid, + scid, + local_address, + remote_address, + }); + STAT_INCREMENT(Stats, packets_received); + return; + } Debug(this, - "Sending retry to %s due to initial 0RTT packet", + "Accepting 0RTT packet from %s without " + "address validation", remote_address); - // If it's a 0RTT packet, we're always going to perform path - // validation no matter what. This is a bit unfortunate since - // ORTT is supposed to be, you know, 0RTT, but sending a retry - // forces a round trip... but if the remote address is not - // validated, there's a possibility that this 0RTT is forged - // or otherwise suspicious. Before we can do anything with it, - // we have to validate it. Keep in mind that this means the - // client needs to respond with a proper initial packet in - // order to proceed. - // TODO(@jasnell): Validate this further to ensure this is - // the correct behavior. - SendRetry(PathDescriptor{ - version, - dcid, - scid, - local_address, - remote_address, - }); - STAT_INCREMENT(Stats, packets_received); - return; + break; } } diff --git a/src/quic/endpoint.h b/src/quic/endpoint.h index 4c218656227aef..7ed9fee81e46b6 100644 --- a/src/quic/endpoint.h +++ b/src/quic/endpoint.h @@ -311,7 +311,6 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { // be prevented. void CloseGracefully(); - void Release(); void PacketDone(int status) override; @@ -346,7 +345,6 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { // packets. // @param bool on - If true, mark the Endpoint as busy. JS_METHOD(MarkBusy); - static void FastMarkBusy(v8::Local receiver, bool on); // DoCloseGracefully is the signal that endpoint should close. Any packets // that are already in the queue or in flight will be allowed to finish, but @@ -360,7 +358,6 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { // Ref() causes a listening Endpoint to keep the event loop active. JS_METHOD(Ref); - static void FastRef(v8::Local receiver, bool on); void Receive(const uv_buf_t& buf, const SocketAddress& from); diff --git a/src/quic/http3.cc b/src/quic/http3.cc index b3244fdad365be..805ea531148ee5 100644 --- a/src/quic/http3.cc +++ b/src/quic/http3.cc @@ -319,14 +319,20 @@ class Http3ApplicationImpl final : public Session::Application { void CollectSessionTicketAppData( SessionTicket::AppData* app_data) const override { - // TODO(@jasnell): There's currently nothing to store but there may be - // later. + // TODO(@jasnell): When HTTP/3 settings become dynamic or + // configurable per-connection, store them here so they can be + // validated on 0-RTT resumption. Candidates include: + // max_field_section_size, qpack_max_dtable_capacity, + // qpack_encoder_max_dtable_capacity, qpack_blocked_streams, + // enable_connect_protocol, and enable_datagrams. On extraction, + // compare stored values against current settings and return + // TICKET_IGNORE_RENEW if incompatible. } SessionTicket::AppData::Status ExtractSessionTicketAppData( const SessionTicket::AppData& app_data, SessionTicket::AppData::Source::Flag flag) override { - // There's currently nothing stored here but we might do so later. + // See CollectSessionTicketAppData above. return flag == SessionTicket::AppData::Source::Flag::STATUS_RENEW ? SessionTicket::AppData::Status::TICKET_USE_RENEW : SessionTicket::AppData::Status::TICKET_USE; @@ -448,10 +454,42 @@ class Http3ApplicationImpl final : public Session::Application { return false; } + void SetStreamPriority(const Stream& stream, + StreamPriority priority, + StreamPriorityFlags flags) override { + nghttp3_pri pri; + pri.inc = (flags == StreamPriorityFlags::NON_INCREMENTAL) ? 0 : 1; + switch (priority) { + case StreamPriority::HIGH: + pri.urgency = NGHTTP3_URGENCY_HIGH; + break; + case StreamPriority::LOW: + pri.urgency = NGHTTP3_URGENCY_LOW; + break; + default: + pri.urgency = NGHTTP3_DEFAULT_URGENCY; + break; + } + if (session().is_server()) { + nghttp3_conn_set_server_stream_priority( + *this, stream.id(), &pri); + } + // Client-side priority is set at request submission time via + // nghttp3_conn_submit_request and is not typically changed + // after the fact. The client API takes a serialized RFC 9218 + // field value rather than an nghttp3_pri struct. + } + StreamPriority GetStreamPriority(const Stream& stream) override { nghttp3_pri pri; if (nghttp3_conn_get_stream_priority(*this, &pri, stream.id()) == 0) { - // TODO(@jasnell): Support the incremental flag + // TODO(@jasnell): The nghttp3_pri.inc (incremental) flag is + // not yet exposed. When priority-based stream scheduling is + // implemented, GetStreamPriority should return both urgency + // and the incremental flag (making get/set symmetrical). + // The inc flag determines whether the server should interleave + // data from this stream with others of the same urgency + // (inc=1) or complete it first (inc=0). switch (pri.urgency) { case NGHTTP3_URGENCY_HIGH: return StreamPriority::HIGH; @@ -498,12 +536,11 @@ class Http3ApplicationImpl final : public Session::Application { nghttp3_err_infer_quic_app_error_code(err))); return false; } + if (data->stream) + data->stream->Commit(datalen, data->fin); return true; } - bool ShouldSetFin(const StreamData& data) override { - return data.id > -1 && !is_control_stream(data.id) && data.fin == 1; - } SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(Http3ApplicationImpl) @@ -674,22 +711,23 @@ class Http3ApplicationImpl final : public Session::Application { stream->ReceiveStreamReset(0, QuicError::ForApplication(app_error_code)); } - void OnShutdown() { - // This callback is invoked when we receive a request to gracefully shutdown - // the http3 connection. For client, the id is the stream id of a client - // initiated stream. For server, the id is the stream id of a server - // initiated stream. Once received, the other side is guaranteed not to - // process any more data. - - // On the client side, if id is equal to NGHTTP3_SHUTDOWN_NOTICE_STREAM_ID, - // or on the server if the id is equal to NGHTTP3_SHUTDOWN_NOTICE_PUSH_ID, - // then this is a request to begin a graceful shutdown. - - // This can be called multiple times but the id can only stay the same or - // *decrease*. - - // TODO(@jasnell): Need to determine exactly how to handle. - Debug(&session(), "HTTP/3 application received shutdown notice"); + void OnShutdown(int64_t id) { + // The peer has sent a GOAWAY frame initiating a graceful shutdown. + // For a client, id is the stream ID beyond which the server will + // not process requests. For a server, id is a push ID (server + // push is not implemented). Streams/pushes with IDs >= id will + // not be processed by the peer. + // + // When id equals NGHTTP3_SHUTDOWN_NOTICE_STREAM_ID (client) or + // NGHTTP3_SHUTDOWN_NOTICE_PUSH_ID (server), this is a notice of + // intent to shut down rather than an immediate refusal. + // + // This can be called multiple times with a decreasing id as the + // peer progressively reduces the set of streams it will process. + Debug(&session(), + "HTTP/3 received GOAWAY (id=%" PRIi64 ")", + id); + session().Close(Session::CloseMethod::GRACEFUL); } void OnReceiveSettings(const nghttp3_settings* settings) { @@ -748,9 +786,55 @@ class Http3ApplicationImpl final : public Session::Application { uint32_t* pflags, void* conn_user_data, void* stream_user_data) { - return NGTCP2_SUCCESS; + auto ptr = From(conn, conn_user_data); + CHECK_NOT_NULL(ptr); + auto& app = *ptr; + NgHttp3CallbackScope scope(app.env()); + + auto stream = app.session().FindStream(stream_id); + if (!stream) return NGHTTP3_ERR_CALLBACK_FAILURE; + + if (stream->is_eos()) { + *pflags |= NGHTTP3_DATA_FLAG_EOF; + return 0; + } + + size_t max_count = std::min(veccnt, + static_cast(kMaxVectorCount)); + nghttp3_ssize result = 0; + + auto next = [&](int status, + const ngtcp2_vec* data, + size_t count, + bob::Done done) { + switch (status) { + case bob::Status::STATUS_BLOCK: + case bob::Status::STATUS_WAIT: + result = NGHTTP3_ERR_WOULDBLOCK; + return; + case bob::Status::STATUS_EOS: + *pflags |= NGHTTP3_DATA_FLAG_EOF; + break; + } + count = std::min(count, max_count); + for (size_t n = 0; n < count; n++) { + vec[n].base = data[n].base; + vec[n].len = data[n].len; + } + result = static_cast(count); + }; + + ngtcp2_vec data[kMaxVectorCount]; + stream->Pull(std::move(next), + bob::Options::OPTIONS_SYNC, + data, + max_count, + max_count); + + return result; } + static int on_acked_stream_data(nghttp3_conn* conn, int64_t stream_id, uint64_t datalen, @@ -935,7 +1019,7 @@ class Http3ApplicationImpl final : public Session::Application { static int on_shutdown(nghttp3_conn* conn, int64_t id, void* conn_user_data) { NGHTTP3_CALLBACK_SCOPE(app); - app.OnShutdown(); + app.OnShutdown(id); return NGTCP2_SUCCESS; } @@ -951,14 +1035,14 @@ class Http3ApplicationImpl final : public Session::Application { const uint8_t* origin, size_t originlen, void* conn_user_data) { - // TODO(@jasnell): Handle the origin callback. This is called - // when a single origin in an ORIGIN frame is received. + // ORIGIN frames (RFC 8336) are used for connection coalescing + // across multiple origins. Not yet implemented u2014 requires + // connection pooling and multi-origin reuse support. return NGTCP2_SUCCESS; } static int on_end_origin(nghttp3_conn* conn, void* conn_user_data) { - // TODO(@jasnell): Handle the end of origin callback. This is called - // when the end of an ORIGIN frame is received. + // See on_receive_origin above. return NGTCP2_SUCCESS; } diff --git a/src/quic/logstream.cc b/src/quic/logstream.cc index 4705d75bdafac0..511b2a1ef46ebe 100644 --- a/src/quic/logstream.cc +++ b/src/quic/logstream.cc @@ -46,22 +46,22 @@ void LogStream::Emit(const uint8_t* data, size_t len, EmitOption option) { // If the len is greater than the size of the buffer returned by // EmitAlloc then EmitRead will be called multiple times. while (remaining != 0) { - uv_buf_t buf = EmitAlloc(len); - size_t len = std::min(remaining, buf.len); - memcpy(buf.base, data, len); - remaining -= len; - data += len; + uv_buf_t buf = EmitAlloc(remaining); + size_t chunk_len = std::min(remaining, buf.len); + memcpy(buf.base, data, chunk_len); + remaining -= chunk_len; + data += chunk_len; // If we are actively reading from the stream, we'll call emit // read immediately. Otherwise we buffer the chunk and will push // the chunks out the next time ReadStart() is called. if (reading_) { - EmitRead(len, buf); + EmitRead(chunk_len, buf); } else { // The total measures the total memory used so we always // increment but buf.len and not chunk len. ensure_space(buf.len); total_ += buf.len; - buffer_.push_back(Chunk{len, buf}); + buffer_.push_back(Chunk{chunk_len, buf}); } } diff --git a/src/quic/preferredaddress.cc b/src/quic/preferredaddress.cc index b45e4689a21c81..51dddd5ecd7f94 100644 --- a/src/quic/preferredaddress.cc +++ b/src/quic/preferredaddress.cc @@ -30,20 +30,15 @@ std::optional get_address_info( if constexpr (FAMILY == AF_INET) { if (!paddr.ipv4_present) return std::nullopt; address.port = paddr.ipv4.sin_port; - if (uv_inet_ntop( - FAMILY, &paddr.ipv4.sin_addr, address.host, sizeof(address.host)) == - 0) { - address.address = address.host; - } + if (uv_inet_ntop(FAMILY, &paddr.ipv4.sin_addr, + address.host, sizeof(address.host)) != 0) + return std::nullopt; } else { if (!paddr.ipv6_present) return std::nullopt; address.port = paddr.ipv6.sin6_port; - if (uv_inet_ntop(FAMILY, - &paddr.ipv6.sin6_addr, - address.host, - sizeof(address.host)) == 0) { - address.address = address.host; - } + if (uv_inet_ntop(FAMILY, &paddr.ipv6.sin6_addr, + address.host, sizeof(address.host)) != 0) + return std::nullopt; } return address; } diff --git a/src/quic/preferredaddress.h b/src/quic/preferredaddress.h index c121322a07bfbc..a7df606cf5696d 100644 --- a/src/quic/preferredaddress.h +++ b/src/quic/preferredaddress.h @@ -41,7 +41,6 @@ class PreferredAddress final { char host[NI_MAXHOST]; int family; uint16_t port; - std::string_view address; }; explicit PreferredAddress(ngtcp2_path* dest, diff --git a/src/quic/quic.cc b/src/quic/quic.cc index e36879e4e7d36b..d39215a06827d3 100644 --- a/src/quic/quic.cc +++ b/src/quic/quic.cc @@ -12,6 +12,8 @@ #include "endpoint.h" #include "node_external_reference.h" +#include +#include namespace node { using v8::Context; @@ -22,6 +24,10 @@ using v8::Value; namespace quic { +namespace { +std::once_flag crypto_init_flag; +} // namespace + void CreatePerIsolateProperties(IsolateData* isolate_data, Local target) { Endpoint::InitPerIsolate(isolate_data, target); @@ -34,6 +40,7 @@ void CreatePerContextProperties(Local target, Local context, void* priv) { Realm* realm = Realm::GetCurrent(context); + std::call_once(crypto_init_flag, ngtcp2_crypto_ossl_init); BindingData::InitPerContext(realm, target); Endpoint::InitPerContext(realm, target); Session::InitPerContext(realm, target); diff --git a/src/quic/session.cc b/src/quic/session.cc index 39ffad3e09faa8..351d5c61a417b8 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -345,7 +345,10 @@ Session::Config::Config(Environment* env, ngtcp2_settings_default(&settings); settings.initial_ts = uv_hrtime(); - // We currently do not support Path MTU Discovery. Once we do, unset this. + // TODO(@jasnell): Path MTU Discovery is disabled because libuv does not + // currently expose the IP_DONTFRAG / IP_MTU_DISCOVER socket options + // needed for PMTUD probes to work correctly. Revisit when libuv adds + // support or if we bypass libuv for the UDP socket options. settings.no_pmtud = 1; // Per the ngtcp2 documentation, when no_tx_udp_payload_size_shaping is set // to a non-zero value, it tells ngtcp2 not to limit the UDP payload size to @@ -372,6 +375,11 @@ Session::Config::Config(Environment* env, settings.max_window = options.max_window; settings.ack_thresh = options.unacknowledged_packet_threshold; settings.cc_algo = options.cc_algorithm; + + if (side == Side::CLIENT && options.token.has_value()) { + ngtcp2_vec vec = options.token.value(); + set_token(vec.base, vec.len, NGTCP2_TOKEN_TYPE_NEW_TOKEN); + } } Session::Config::Config(Environment* env, @@ -465,6 +473,17 @@ Maybe Session::Options::From(Environment* env, // TODO(@jasnell): Later we will also support setting the CID::Factory. // For now, we're just using the default random factory. + // Parse the optional NEW_TOKEN for address validation on reconnection. + Local token_val; + if (params->Get(env->context(), state.token_string()) + .ToLocal(&token_val) && + token_val->IsArrayBufferView()) { + Store token_store; + if (Store::From(token_val.As()).To(&token_store)) { + options.token = std::move(token_store); + } + } + return Just(options); } @@ -771,16 +790,16 @@ struct Session::Impl final : public MemoryRetainer { Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.This()); - if (session->is_destroyed()) { + if (session->is_destroyed()) [[unlikely]] { return THROW_ERR_INVALID_STATE(env, "Session is destroyed"); } DCHECK(args[0]->IsUint32()); // GetDataQueueFromSource handles type validation. - std::shared_ptr data_source = - Stream::GetDataQueueFromSource(env, args[1]).ToChecked(); - if (data_source == nullptr) { + std::shared_ptr data_source; + if (!Stream::GetDataQueueFromSource(env, args[1]).To(&data_source) || + data_source == nullptr) [[unlikely]] { THROW_ERR_INVALID_ARG_VALUE(env, "Invalid data source"); } @@ -878,7 +897,9 @@ struct Session::Impl final : public MemoryRetainer { uint64_t max_streams, void* user_data) { NGTCP2_CALLBACK_SCOPE(session) - // TODO(@jasnell): Do anything here? + Debug(session, + "Max remote bidi streams increased to %" PRIu64, + max_streams); return NGTCP2_SUCCESS; } @@ -886,7 +907,7 @@ struct Session::Impl final : public MemoryRetainer { uint64_t max_streams, void* user_data) { NGTCP2_CALLBACK_SCOPE(session) - // TODO(@jasnell): Do anything here? + Debug(session, "Max remote uni streams increased to %" PRIu64, max_streams); return NGTCP2_SUCCESS; } @@ -993,7 +1014,8 @@ struct Session::Impl final : public MemoryRetainer { size_t tokenlen, void* user_data) { NGTCP2_CALLBACK_SCOPE(session) - // We currently do nothing with this callback. + Debug(session, "Received NEW_TOKEN (%zu bytes)", tokenlen); + session->EmitNewToken(token, tokenlen); return NGTCP2_SUCCESS; } @@ -1155,8 +1177,9 @@ struct Session::Impl final : public MemoryRetainer { } static int on_early_data_rejected(ngtcp2_conn* conn, void* user_data) { - // TODO(@jasnell): Called when early data was rejected by server during the - // TLS handshake or client decided not to attempt early data. + auto session = Impl::From(conn, user_data); + if (session == nullptr) return NGTCP2_ERR_CALLBACK_FAILURE; + Debug(session, "Early data was rejected"); return NGTCP2_SUCCESS; } @@ -1165,7 +1188,8 @@ struct Session::Impl final : public MemoryRetainer { const ngtcp2_path* path, const ngtcp2_path* fallback_path, void* user_data) { - // TODO(@jasnell): Implement? + NGTCP2_CALLBACK_SCOPE(session) + Debug(session, "Path validation started"); return NGTCP2_SUCCESS; } @@ -1607,12 +1631,15 @@ bool Session::Receive(Store&& store, // session is not destroyed before we try doing anything with it // (like updating stats, sending pending data, etc). int err = ngtcp2_conn_read_pkt( - *this, &path, nullptr, vec.base, vec.len, uv_hrtime()); + *this, &path, + // TODO(@jasnell): ECN pkt_info blocked on libuv + nullptr, + vec.base, vec.len, uv_hrtime()); switch (err) { case 0: { Debug(this, "Session successfully received %zu-byte packet", vec.len); - if (!is_destroyed()) [[unlikely]] { + if (!is_destroyed()) [[likely]] { auto& stats_ = impl_->stats_; STAT_INCREMENT_N(Stats, bytes_received, vec.len); } @@ -2400,9 +2427,10 @@ bool Session::HandshakeCompleted() { STAT_RECORD_TIMESTAMP(Stats, handshake_completed_at); SetStreamOpenAllowed(); - // TODO(@jasnel): Not yet supporting early data... - // if (!tls_session().early_data_was_accepted()) - // ngtcp2_conn_tls_early_data_rejected(*this); + // If early data was attempted but rejected by the server, + // tell ngtcp2 so it can retransmit the data as 1-RTT. + if (!is_server() && !tls_session().early_data_was_accepted()) + ngtcp2_conn_tls_early_data_rejected(*this); // When in a server session, handshake completed == handshake confirmed. if (is_server()) { @@ -2448,9 +2476,9 @@ void Session::SelectPreferredAddress(PreferredAddress* preferredAddress) { Debug(this, "Selecting preferred address for AF_INET"); auto ipv4 = preferredAddress->ipv4(); if (ipv4.has_value()) { - if (ipv4->address.empty() || ipv4->port == 0) return; + if (ipv4->host[0] == '\0' || ipv4->port == 0) return; CHECK(SocketAddress::New(AF_INET, - std::string(ipv4->address).c_str(), + ipv4->host, ipv4->port, &impl_->remote_address_)); preferredAddress->Use(ipv4.value()); @@ -2461,9 +2489,9 @@ void Session::SelectPreferredAddress(PreferredAddress* preferredAddress) { Debug(this, "Selecting preferred address for AF_INET6"); auto ipv6 = preferredAddress->ipv6(); if (ipv6.has_value()) { - if (ipv6->address.empty() || ipv6->port == 0) return; - CHECK(SocketAddress::New(AF_INET, - std::string(ipv6->address).c_str(), + if (ipv6->host[0] == '\0' || ipv6->port == 0) return; + CHECK(SocketAddress::New(AF_INET6, + ipv6->host, ipv6->port, &impl_->remote_address_)); preferredAddress->Use(ipv6.value()); @@ -2528,8 +2556,8 @@ void Session::ProcessPendingUniStreams() { } case NGTCP2_ERR_STREAM_ID_BLOCKED: { // This case really should not happen since we've checked the number - // of bidi streams left above. However, if it does happen we'll treat - // it the same as if the get_streams_bidi_left call returned zero. + // of uni streams left above. However, if it does happen we'll treat + // it the same as if the get_streams_uni_left call returned zero. return; } default: { @@ -2732,6 +2760,25 @@ void Session::EmitSessionTicket(Store&& ticket) { } } +void Session::EmitNewToken(const uint8_t* token, size_t len) { + DCHECK(!is_destroyed()); + if (!env()->can_call_into_js()) return; + + CallbackScope cb_scope(this); + + Local argv[2]; + auto buf = Buffer::Copy( + env(), reinterpret_cast(token), len); + if (!buf.ToLocal(&argv[0])) return; + argv[1] = SocketAddressBase::Create( + env(), + std::make_shared(remote_address()))->object(); + MakeCallback( + BindingData::Get(env()).session_new_token_callback(), + arraysize(argv), + argv); +} + void Session::EmitStream(const BaseObjectWeakPtr& stream) { DCHECK(!is_destroyed()); diff --git a/src/quic/session.h b/src/quic/session.h index ddaddb8d18a7a7..1b7cb49d9e373e 100644 --- a/src/quic/session.h +++ b/src/quic/session.h @@ -58,6 +58,7 @@ class Endpoint; // object itself is closed/destroyed by user code. class Session final : public AsyncWrap, private SessionTicket::AppData::Source { public: + SessionTicket::AppData::Source& ticket_app_data_source() { return *this; } // For simplicity, we use the same Application::Options struct for all // Application types. This may change in the future. Not all of the options // are going to be relevant for all Application types. @@ -176,6 +177,11 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source { // is the better of the two for our needs. ngtcp2_cc_algo cc_algorithm = CC_ALGO_CUBIC; + // An optional NEW_TOKEN from a previous connection to the same + // server. When set, the token is included in the Initial packet + // to skip address validation. Client-side only. + std::optional token; + void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(Session::Options) SET_SELF_SIZE(Options) @@ -477,6 +483,7 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source { const ValidatedPath& newPath, const std::optional& oldPath); void EmitSessionTicket(Store&& ticket); + void EmitNewToken(const uint8_t* token, size_t len); void EmitStream(const BaseObjectWeakPtr& stream); void EmitVersionNegotiation(const ngtcp2_pkt_hd& hd, const uint32_t* sv, diff --git a/src/quic/sessionticket.cc b/src/quic/sessionticket.cc index 8956b44068e2ee..2fe44c1f56e48a 100644 --- a/src/quic/sessionticket.cc +++ b/src/quic/sessionticket.cc @@ -2,6 +2,9 @@ #include "guard.h" #ifndef OPENSSL_NO_QUIC #include "sessionticket.h" +#include "tlscontext.h" +#include "session.h" +#include #include #include #include @@ -25,12 +28,8 @@ namespace quic { namespace { SessionTicket::AppData::Source* GetAppDataSource(SSL* ssl) { - ngtcp2_crypto_conn_ref* ref = - static_cast(SSL_get_app_data(ssl)); - if (ref != nullptr && ref->user_data != nullptr) { - return static_cast(ref->user_data); - } - return nullptr; + auto& tls_session = TLSSession::From(ssl); + return &tls_session.session().ticket_app_data_source(); } } // namespace diff --git a/src/quic/streams.cc b/src/quic/streams.cc index f84bf4252e4877..4b1536629580de 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -30,6 +30,7 @@ using v8::Nothing; using v8::Object; using v8::ObjectTemplate; using v8::SharedArrayBuffer; +using v8::Uint8Array; using v8::Value; namespace quic { @@ -41,7 +42,6 @@ namespace quic { V(FIN_RECEIVED, fin_received, uint8_t) \ V(READ_ENDED, read_ended, uint8_t) \ V(WRITE_ENDED, write_ended, uint8_t) \ - V(PAUSED, paused, uint8_t) \ V(RESET, reset, uint8_t) \ V(HAS_OUTBOUND, has_outbound, uint8_t) \ V(HAS_READER, has_reader, uint8_t) \ @@ -83,7 +83,10 @@ namespace quic { V(ResetStream, resetStream, false) \ V(SetPriority, setPriority, false) \ V(GetPriority, getPriority, true) \ - V(GetReader, getReader, false) + V(GetReader, getReader, false) \ + V(InitStreamingSource, initStreamingSource, false) \ + V(Write, write, false) \ + V(EndWrite, endWrite, false) // ============================================================================ @@ -140,6 +143,38 @@ STAT_STRUCT(Stream, STREAM) // ============================================================================ +namespace { +// Creates an in-memory DataQueue entry from an ArrayBuffer by either +// detaching it (zero-copy) or copying its contents if detach is not +// possible (e.g., SharedArrayBuffer-backed or non-detachable). +// Returns nullptr on failure (error already thrown if allocation failed). +std::unique_ptr CreateEntryFromBuffer( + Environment* env, + Local buffer, + size_t offset, + size_t length) { + if (length == 0) return nullptr; + std::shared_ptr backing; + if (buffer->IsDetachable()) { + backing = buffer->GetBackingStore(); + if (buffer->Detach(Local()).IsNothing()) { + backing.reset(); + } + } + if (!backing) { + // Buffer is not detachable or detach failed. Copy the data. + JS_TRY_ALLOCATE_BACKING_OR_RETURN(env, copy, length, nullptr); + memcpy(copy->Data(), + static_cast(buffer->Data()) + offset, + length); + offset = 0; + backing = std::move(copy); + } + return DataQueue::CreateInMemoryEntryFromBackingStore( + std::move(backing), offset, length); +} +} // namespace + Maybe> Stream::GetDataQueueFromSource( Environment* env, Local value) { DCHECK_IMPLIES(!value->IsUndefined(), value->IsObject()); @@ -148,59 +183,41 @@ Maybe> Stream::GetDataQueueFromSource( // Return an empty DataQueue. return Just(std::shared_ptr()); } else if (value->IsArrayBuffer()) { - // DataQueue is created from an ArrayBuffer. auto buffer = value.As(); - // We require that the ArrayBuffer be detachable. This ensures that the - // underlying memory can be transferred to the DataQueue without risk - // of the memory being modified by JavaScript code while it is owned - // by the DataQueue. - if (!buffer->IsDetachable()) { - THROW_ERR_INVALID_ARG_TYPE(env, "Data source not detachable"); - return Nothing>(); - } - auto backing = buffer->GetBackingStore(); - uint64_t offset = 0; - uint64_t length = buffer->ByteLength(); - if (buffer->Detach(Local()).IsNothing()) { - THROW_ERR_INVALID_ARG_TYPE(env, "Data source not detachable"); - return Nothing>(); + auto length = buffer->ByteLength(); + if (length > 0) { + auto entry = CreateEntryFromBuffer(env, buffer, 0, length); + if (!entry) { + return Nothing>(); + } + entries.push_back(std::move(entry)); } - entries.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( - std::move(backing), offset, length)); return Just(DataQueue::CreateIdempotent(std::move(entries))); } else if (value->IsSharedArrayBuffer()) { - // We aren't going to allow use of SharedArrayBuffer as a data source. - // The reason is that SharedArrayBuffer memory is possibly shared with - // other JavaScript code and we cannot detach it, making it impossible - // for us to guarantee that the memory will not be modified while it - // is owned by the DataQueue. - THROW_ERR_INVALID_ARG_TYPE(env, "SharedArrayBuffer is not allowed"); - return Nothing>(); + auto sab = value.As(); + auto length = sab->ByteLength(); + if (length > 0) { + // SharedArrayBuffer cannot be detached, so we always copy. Note that + // because of the nature of SAB, another thread can end up modifying + // the SAB while we're copying, which is racy but unavoidable. + JS_TRY_ALLOCATE_BACKING_OR_RETURN( + env, backing, length, Nothing>()); + memcpy(backing->Data(), sab->Data(), length); + entries.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( + std::move(backing), 0, length)); + } + return Just(DataQueue::CreateIdempotent(std::move(entries))); } else if (value->IsArrayBufferView()) { auto view = value.As(); - auto buffer = view->Buffer(); - if (buffer->IsSharedArrayBuffer()) { - // We aren't going to allow use of SharedArrayBuffer as a data source. - // The reason is that SharedArrayBuffer memory is possibly shared with - // other JavaScript code and we cannot detach it, making it impossible - // for us to guarantee that the memory will not be modified while it - // is owned by the DataQueue. - THROW_ERR_INVALID_ARG_TYPE(env, "SharedArrayBuffer is not allowed"); - return Nothing>(); - } - if (!buffer->IsDetachable()) { - THROW_ERR_INVALID_ARG_TYPE(env, "Data source not detachable"); - return Nothing>(); - } - if (buffer->Detach(Local()).IsNothing()) { - THROW_ERR_INVALID_ARG_TYPE(env, "Data source not detachable"); - return Nothing>(); - } - auto backing = buffer->GetBackingStore(); auto offset = view->ByteOffset(); auto length = view->ByteLength(); - entries.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( - std::move(backing), offset, length)); + if (length > 0) { + auto entry = CreateEntryFromBuffer(env, view->Buffer(), offset, length); + if (!entry) { + return Nothing>(); + } + entries.push_back(std::move(entry)); + } return Just(DataQueue::CreateIdempotent(std::move(entries))); } else if (Blob::HasInstance(env, value)) { Blob* blob; @@ -242,9 +259,15 @@ struct Stream::Impl { ASSIGN_OR_RETURN_UNWRAP(&stream, args.This()); if (args.Length() > 1) { CHECK(args[0]->IsBigInt()); - bool unused = false; - stream->Destroy(QuicError::ForApplication( - args[0].As()->Uint64Value(&unused))); + bool lossless = false; + uint64_t code = args[0].As()->Uint64Value(&lossless); + // If the code cannot be represented in 64 bits, it is too large to be + // a valid QUIC error code, error! + if (!lossless) { + THROW_ERR_INVALID_ARG_TYPE(stream->env(), "Error code is too large"); + return; + } + stream->Destroy(QuicError::ForApplication(code)); } else { stream->Destroy(); } @@ -375,17 +398,43 @@ struct Stream::Impl { THROW_ERR_INVALID_STATE(Environment::GetCurrent(args), "Unable to get a reader for the stream"); } + + JS_METHOD(InitStreamingSource) { + Stream* stream; + ASSIGN_OR_RETURN_UNWRAP(&stream, args.This()); + stream->InitStreaming(); + } + + JS_METHOD(Write) { + Stream* stream; + ASSIGN_OR_RETURN_UNWRAP(&stream, args.This()); + stream->WriteStreamData(args); + } + + JS_METHOD(EndWrite) { + Stream* stream; + ASSIGN_OR_RETURN_UNWRAP(&stream, args.This()); + stream->EndWriting(); + } }; // ============================================================================ class Stream::Outbound final : public MemoryRetainer { public: - Outbound(Stream* stream, std::shared_ptr queue) + explicit Outbound(Stream* stream, std::shared_ptr queue) : stream_(stream), queue_(std::move(queue)), reader_(queue_->get_reader()) {} + // Creates an Outbound in streaming mode with a non-idempotent DataQueue + // that can be appended to via AppendEntry(). + explicit Outbound(Stream* stream) + : stream_(stream), + queue_(DataQueue::Create()), + reader_(queue_->get_reader()), + streaming_(true) {} + void Acknowledge(size_t amount) { size_t remaining = std::min(amount, total_ - uncommitted_); while (remaining > 0 && head_ != nullptr) { @@ -457,6 +506,17 @@ class Stream::Outbound final : public MemoryRetainer { if (queue_) queue_->cap(); } + bool is_streaming() const { return streaming_; } + size_t total() const { return total_; } + + // Appends an entry to the underlying DataQueue. Only valid when + // the Outbound was created in streaming mode. + bool AppendEntry(std::unique_ptr entry) { + if (!streaming_ || !queue_) return false; + auto result = queue_->append(std::move(entry)); + return result.has_value() && result.value(); + } + int Pull(bob::Next next, int options, ngtcp2_vec* data, @@ -703,6 +763,9 @@ class Stream::Outbound final : public MemoryRetainer { bool errored_ = false; + // True when in streaming mode (non-idempotent queue, appendable). + bool streaming_ = false; + // Will be set to true if the reader_ ends up providing a pull result // asynchronously. bool next_pending_ = false; @@ -1024,7 +1087,9 @@ bool Stream::is_readable() const { BaseObjectPtr Stream::get_reader() { if (!is_readable() || state_->has_reader) return {}; state_->has_reader = 1; - return Blob::Reader::Create(env(), Blob::Create(env(), inbound_)); + auto reader = Blob::Reader::Create(env(), Blob::Create(env(), inbound_)); + reader_ = reader; + return reader; } void Stream::set_final_size(uint64_t final_size) { @@ -1043,11 +1108,86 @@ void Stream::set_outbound(std::shared_ptr source) { if (!is_pending()) session_->ResumeStream(id()); } +void Stream::InitStreaming() { + auto env = this->env(); + if (outbound_ != nullptr) { + return THROW_ERR_INVALID_STATE(env, + "Outbound data source is already initialized"); + } + if (!is_writable()) { + return THROW_ERR_INVALID_STATE(env, "Stream is not writable"); + } + Debug(this, "Initializing streaming outbound source"); + outbound_ = std::make_unique(this); + state_->has_outbound = 1; + if (!is_pending()) session_->ResumeStream(id()); +} + +void Stream::WriteStreamData(const v8::FunctionCallbackInfo& args) { + auto env = this->env(); + if (outbound_ == nullptr || !outbound_->is_streaming()) { + return THROW_ERR_INVALID_STATE(env, + "Streaming source is not initialized"); + } + + if (!is_writable()) { + return THROW_ERR_INVALID_STATE(env, "Stream is no longer writable"); + } + + auto append_view = [&](Local value) -> bool { + if (!value->IsUint8Array()) { + THROW_ERR_INVALID_ARG_TYPE(env, "Expected Uint8Array"); + return false; + } + auto view = value.As(); + auto length = view->ByteLength(); + if (length == 0) return true; + auto entry = CreateEntryFromBuffer( + env, view->Buffer(), view->ByteOffset(), length); + if (!entry) { + return false; + } + return outbound_->AppendEntry(std::move(entry)); + }; + + // There must always be exactly one argument to WriteStreamData. + CHECK_EQ(args.Length(), 1); + + // The args[0] must always be an Array of Uint8Arrays + CHECK(args[0]->IsArray()); + + auto array = args[0].As(); + for (uint32_t i = 0; i < array->Length(); i++) { + Local item; + if (!array->Get(env->context(), i).ToLocal(&item)) return; + if (!append_view(item)) return; + } + + if (!is_pending()) session_->ResumeStream(id()); + + args.GetReturnValue().Set(static_cast(outbound_->total())); +} + +void Stream::EndWriting() { + auto env = this->env(); + if (outbound_ == nullptr || !outbound_->is_streaming()) { + return THROW_ERR_INVALID_STATE(env, "Streaming source is not initialized"); + } + + if (!is_writable()) { + return THROW_ERR_INVALID_STATE(env, "Stream is no longer writable"); + } + Debug(this, "Ending streaming outbound source"); + EndWritable(); + if (!is_pending()) session_->ResumeStream(id()); +} + void Stream::EntryRead(size_t amount) { // Tells us that amount bytes we're reading from inbound_ // We use this as a signal to extend the flow control // window to receive more bytes. session().ExtendStreamOffset(id(), amount); + session().ExtendOffset(amount); } int Stream::DoPull(bob::Next next, @@ -1119,12 +1259,14 @@ void Stream::Acknowledge(size_t datalen) { // Consumes the given number of bytes in the buffer. outbound_->Acknowledge(datalen); + STAT_RECORD_TIMESTAMP(Stats, acked_at); } -void Stream::Commit(size_t datalen) { +void Stream::Commit(size_t datalen, bool fin) { Debug(this, "Committing %zu bytes", datalen); - STAT_RECORD_TIMESTAMP(Stats, acked_at); + STAT_INCREMENT_N(Stats, bytes_sent, datalen); if (outbound_) outbound_->Commit(datalen); + if (fin) state_->fin_sent = 1; } void Stream::EndWritable() { @@ -1142,6 +1284,8 @@ void Stream::EndReadable(std::optional maybe_final_size) { state_->read_ended = 1; set_final_size(maybe_final_size.value_or(STAT_GET(Stats, bytes_received))); inbound_->cap(STAT_GET(Stats, final_size)); + // Notify the JS reader so it can see EOS. + if (reader_) reader_->NotifyPull(); } void Stream::Destroy(QuicError error) { @@ -1176,6 +1320,7 @@ void Stream::Destroy(QuicError error) { // which may keep that data alive a bit longer. inbound_->removeBackpressureListener(this); inbound_.reset(); + reader_.reset(); // Notify the JavaScript side that our handle is being destroyed. The // JavaScript side should clean up any state that it needs to and should @@ -1210,12 +1355,16 @@ void Stream::ReceiveData(const uint8_t* data, } STAT_INCREMENT_N(Stats, bytes_received, len); + STAT_SET(Stats, max_offset_received, STAT_GET(Stats, bytes_received)); STAT_RECORD_TIMESTAMP(Stats, received_at); JS_TRY_ALLOCATE_BACKING(env(), backing, len) memcpy(backing->Data(), data, len); inbound_->append(DataQueue::CreateInMemoryEntryFromBackingStore( std::move(backing), 0, len)); + // Notify the JS reader that data is available. + if (reader_) reader_->NotifyPull(); + if (flags.fin) EndReadable(); } @@ -1313,10 +1462,6 @@ void Stream::Schedule(Queue* queue) { if (outbound_ && stream_queue_.IsEmpty()) queue->PushBack(this); } -void Stream::Unschedule() { - Debug(this, "Unscheduled"); - stream_queue_.Remove(); -} } // namespace quic } // namespace node diff --git a/src/quic/streams.h b/src/quic/streams.h index c230815d78e4be..610aac2de334f4 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -37,9 +37,9 @@ using Ngtcp2Source = bob::SourceImpl; // Note that only locally initiated streams can be created in a pending state. class PendingStream final { public: - PendingStream(Direction direction, - Stream* stream, - BaseObjectWeakPtr session); + explicit PendingStream(Direction direction, + Stream* stream, + BaseObjectWeakPtr session); DISALLOW_COPY_AND_MOVE(PendingStream) ~PendingStream(); @@ -233,7 +233,7 @@ class Stream final : public AsyncWrap, // indication occuring the first time data is sent. It does not indicate // that the data has been retransmitted due to loss or has been // acknowledged to have been received by the peer. - void Commit(size_t datalen); + void Commit(size_t datalen, bool fin = false); void EndWritable(); void EndReadable(std::optional maybe_final_size = std::nullopt); @@ -280,6 +280,8 @@ class Stream final : public AsyncWrap, // have already been added, or the maximum total header length is reached. bool AddHeader(const Header& header); + // TODO(@jasnell): Implement MemoryInfo to track outbound_, inbound_, + // reader_, headers_, and pending_headers_queue_. SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(Stream) SET_SELF_SIZE(Stream) @@ -299,6 +301,11 @@ class Stream final : public AsyncWrap, void set_final_size(uint64_t amount); void set_outbound(std::shared_ptr source); + // Streaming outbound support + void InitStreaming(); + void WriteStreamData(const v8::FunctionCallbackInfo& args); + void EndWriting(); + bool is_local_unidirectional() const; bool is_remote_unidirectional() const; @@ -337,6 +344,7 @@ class Stream final : public AsyncWrap, BaseObjectWeakPtr session_; std::unique_ptr outbound_; std::shared_ptr inbound_; + BaseObjectWeakPtr reader_; // If the stream cannot be opened yet, it will be created in a pending state. // Once the owning session is able to, it will complete opening of the stream @@ -372,7 +380,7 @@ class Stream final : public AsyncWrap, friend class DefaultApplication; public: - // The Queue/Schedule/Unschedule here are part of the mechanism used to + // The Queue/Schedule here are part of the mechanism used to // determine which streams have data to send on the session. When a stream // potentially has data available, it will be scheduled in the Queue. Then, // when the Session::Application starts sending pending data, it will check @@ -385,7 +393,6 @@ class Stream final : public AsyncWrap, using Queue = ListHead; void Schedule(Queue* queue); - void Unschedule(); }; } // namespace node::quic diff --git a/src/quic/tlscontext.cc b/src/quic/tlscontext.cc index f9eb9d8de16393..b2210dd902a920 100644 --- a/src/quic/tlscontext.cc +++ b/src/quic/tlscontext.cc @@ -43,16 +43,6 @@ namespace quic { // ============================================================================ namespace { -// Performance optimization recommended by ngtcp2. Need to investigate why -// this causes some tests to fail. -// auto _ = []() { -// if (ngtcp2_crypto_ossl_init() != 0) { -// assert(0); -// abort(); -// } - -// return 0; -// }(); // Temporarily wraps an SSL pointer but does not take ownership. // Use by a few of the TLSSession methods that need access to the SSL* @@ -99,7 +89,7 @@ void EnableTrace(Environment* env, BIOPointer* bio, SSL* ssl) { #endif } -template Opt::*member> +template Opt::* member> bool SetOption(Environment* env, Opt* options, const Local& object, @@ -230,7 +220,8 @@ std::string OSSLContext::get_selected_alpn() const { const unsigned char* alpn = nullptr; unsigned int len; SSL_get0_alpn_selected(*this, &alpn, &len); - return std::string(alpn, alpn + len); + if (alpn == nullptr) return {}; + return std::string(reinterpret_cast(alpn), len); } std::string_view OSSLContext::get_negotiated_group() const { @@ -267,6 +258,12 @@ bool OSSLContext::get_early_data_accepted() const { return SSL_get_early_data_status(*this) == SSL_EARLY_DATA_ACCEPTED; } +bool OSSLContext::set_session_ticket(const ncrypto::SSLSessionPointer& ticket) { + if (!ticket) return false; + if (SSL_set_session(*this, ticket.get()) != 1) return false; + return SSL_SESSION_get_max_early_data(ticket.get()) != 0; +} + bool OSSLContext::ConfigureServer() const { if (ngtcp2_crypto_ossl_configure_server_session(*this) != 0) return false; SSL_set_accept_state(*this); @@ -364,8 +361,40 @@ void TLSContext::OnKeylog(const SSL* ssl, const char* line) { int TLSContext::OnVerifyClientCertificate(int preverify_ok, X509_STORE_CTX* ctx) { - // TODO(@jasnell): Implement the logic to verify the client certificate - return 1; + // This callback is invoked by OpenSSL for each certificate in the + // client's chain during the TLS handshake. The preverify_ok + // parameter reflects OpenSSL's own chain validation result for + // the current certificate. Failures include: + // - Expired or not-yet-valid certificates + // - Self-signed certificates not in the trusted CA list + // - Broken chain (signature verification failure) + // - Untrusted CA (chain does not terminate at a configured CA) + // - Revoked certificates (if CRL is configured) + // - Invalid basic constraints or key usage + // + // If preverify_ok is 1, validation passed for this cert and we + // always continue. If it is 0, the behavior depends on the + // reject_unauthorized option: + // - true (default): return 0 to abort the handshake immediately, + // avoiding wasted work on an untrusted client. + // - false: return 1 to let the handshake complete. The validation + // error is still recorded by OpenSSL and will be reported to JS + // via VerifyPeerIdentity() in the handshake callback, allowing + // the application to make its own decision. + // + // Note that even when preverify_ok is 1 (chain validation passed), + // the application may need to perform additional verification after + // the handshake — for example, checking the certificate's common + // name or subject alternative names against an allowlist, verifying + // application-specific fields or extensions, or enforcing certificate + // pinning. Chain validation only confirms cryptographic integrity + // and trust anchor; it does not confirm authorization. + if (preverify_ok) return 1; + + SSL* ssl = static_cast( + X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx())); + auto& tls_session = TLSSession::From(ssl); + return tls_session.context().options().reject_unauthorized ? 0 : 1; } std::unique_ptr TLSContext::NewSession( @@ -387,12 +416,17 @@ SSLCtxPointer TLSContext::Initialize() { return {}; } - if (SSL_CTX_set_max_early_data(ctx.get(), UINT32_MAX) != 1) { + if (SSL_CTX_set_max_early_data( + ctx.get(), + options_.enable_early_data ? UINT32_MAX : 0) != 1) { validation_error_ = "Failed to set max early data"; return {}; } + // ngtcp2 handles replay protection at the QUIC layer, + // so we disable OpenSSL's built-in anti-replay. SSL_CTX_set_options(ctx.get(), - (SSL_OP_ALL & ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS) | + (SSL_OP_ALL & + ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS) | SSL_OP_SINGLE_ECDH_USE | SSL_OP_CIPHER_SERVER_PREFERENCE | SSL_OP_NO_ANTI_REPLAY); @@ -406,20 +440,18 @@ SSLCtxPointer TLSContext::Initialize() { return {}; } - if (options_.verify_client) [[likely]] { + if (options_.verify_client) [[unlikely]] { SSL_CTX_set_verify(ctx.get(), SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, OnVerifyClientCertificate); } - // TODO(@jasnell): There's a bug int the GenerateCallback flow somewhere. - // Need to update in order to support session tickets. - // CHECK_EQ(SSL_CTX_set_session_ticket_cb(ctx.get(), - // SessionTicket::GenerateCallback, - // SessionTicket::DecryptedCallback, - // nullptr), - // 1); + CHECK_EQ(SSL_CTX_set_session_ticket_cb(ctx.get(), + SessionTicket::GenerateCallback, + SessionTicket::DecryptedCallback, + nullptr), + 1); break; } case Side::CLIENT: { @@ -580,11 +612,13 @@ Maybe TLSContext::Options::From(Environment* env, SetOption( \ env, &options, params, state.name##_string()) - if (!SET(verify_client) || !SET(enable_tls_trace) || !SET(protocol) || - !SET(servername) || !SET(ciphers) || !SET(groups) || - !SET(verify_private_key) || !SET(keylog) || - !SET_VECTOR(crypto::KeyObjectData, keys) || !SET_VECTOR(Store, certs) || - !SET_VECTOR(Store, ca) || !SET_VECTOR(Store, crl)) { + if (!SET(verify_client) || !SET(reject_unauthorized) || + !SET(enable_early_data) || !SET(enable_tls_trace) || + !SET(enable_tls_trace) || !SET(protocol) || !SET(servername) || + !SET(ciphers) || !SET(groups) || !SET(verify_private_key) || + !SET(keylog) || !SET_VECTOR(crypto::KeyObjectData, keys) || + !SET_VECTOR(Store, certs) || !SET_VECTOR(Store, ca) || + !SET_VECTOR(Store, crl)) { return Nothing(); } @@ -601,6 +635,10 @@ std::string TLSContext::Options::ToString() const { prefix + "keylog: " + (keylog ? std::string("yes") : std::string("no")); res += prefix + "verify client: " + (verify_client ? std::string("yes") : std::string("no")); + res += prefix + "reject unauthorized: " + + (reject_unauthorized ? std::string("yes") : std::string("no")); + res += prefix + "enable early data: " + + (enable_early_data ? std::string("yes") : std::string("no")); res += prefix + "enable_tls_trace: " + (enable_tls_trace ? std::string("yes") : std::string("no")); res += prefix + "verify private key: " + @@ -710,8 +748,7 @@ void TLSSession::Initialize( reinterpret_cast(buf.base), buf.len); // The early data will just be ignored if it's invalid. - if (ssl.setSession(ticket) && - SSL_SESSION_get_max_early_data(ticket.get()) != 0) { + if (ossl_context_.set_session_ticket(ticket)) { ngtcp2_vec rtp = sessionTicket.transport_params(); if (ngtcp2_conn_decode_and_set_0rtt_transport_params( *session_, rtp.base, rtp.len) == 0) { diff --git a/src/quic/tlscontext.h b/src/quic/tlscontext.h index 528085b00497a7..2fee7a318be436 100644 --- a/src/quic/tlscontext.h +++ b/src/quic/tlscontext.h @@ -55,6 +55,10 @@ class OSSLContext final { bool get_early_data_accepted() const; + // Sets the session ticket for 0-RTT resumption. Returns true if the + // ticket was set successfully and the ticket supports early data. + bool set_session_ticket(const ncrypto::SSLSessionPointer& ticket); + bool ConfigureServer() const; bool ConfigureClient() const; @@ -190,6 +194,21 @@ class TLSContext final : public MemoryRetainer, // option is only used by the server side. bool verify_client = false; + // When true (the default), client certificates that fail chain + // validation are rejected during the handshake. When false, the + // handshake completes and the validation result is passed to JS + // via the handshake callback for the application to decide. + // This option is only used by the server side. + bool reject_unauthorized = true; + + // When true (the default), the server accepts 0-RTT early data + // from clients with valid session tickets. When false, early data + // is disabled and clients must complete a full handshake before + // sending application data. Disabling early data prevents replay + // attacks at the cost of an additional round trip. + // This option is only used by the server side. + bool enable_early_data = true; + // When true, enables TLS tracing for the session. This should only be used // for debugging. // JavaScript option name "tlsTrace". diff --git a/src/quic/tokens.cc b/src/quic/tokens.cc index 962e321786fe08..1019b43a534809 100644 --- a/src/quic/tokens.cc +++ b/src/quic/tokens.cc @@ -1,7 +1,6 @@ #if HAVE_OPENSSL && HAVE_QUIC #include "guard.h" #ifndef OPENSSL_NO_QUIC -#include "tokens.h" #include #include #include @@ -10,6 +9,7 @@ #include #include "nbytes.h" #include "ncrypto.h" +#include "tokens.h" namespace node::quic { @@ -108,7 +108,7 @@ bool StatelessResetToken::operator==(const StatelessResetToken& other) const { (ptr_ != nullptr && other.ptr_ == nullptr)) { return false; } - return memcmp(ptr_, other.ptr_, kStatelessTokenLen) == 0; + return CRYPTO_memcmp(ptr_, other.ptr_, kStatelessTokenLen) == 0; } bool StatelessResetToken::operator!=(const StatelessResetToken& other) const { @@ -126,11 +126,12 @@ std::string StatelessResetToken::ToString() const { size_t StatelessResetToken::Hash::operator()( const StatelessResetToken& token) const { + // See CID::Hash for details on this hash combine strategy. size_t hash = 0; if (token.ptr_ == nullptr) return hash; - for (size_t n = 0; n < kStatelessTokenLen; n++) - hash ^= std::hash{}(token.ptr_[n]) + 0x9e3779b9 + (hash << 6) + - (hash >> 2); + for (size_t n = 0; n < kStatelessTokenLen; n++) { + hash ^= token.ptr_[n] + 0x9e3779b9 + (hash << 6) + (hash >> 2); + } return hash; } @@ -195,7 +196,7 @@ RetryToken::RetryToken(uint32_t version, RetryToken::RetryToken(const uint8_t* token, size_t size) : ptr_(ngtcp2_vec{const_cast(token), size}) { DCHECK_LE(size, RetryToken::kRetryTokenLen); - DCHECK_IMPLIES(token == nullptr, size = 0); + DCHECK_IMPLIES(token == nullptr, size == 0); } std::optional RetryToken::Validate(uint32_t version, @@ -215,7 +216,9 @@ std::optional RetryToken::Validate(uint32_t version, addr.data(), addr.length(), dcid, - std::min(verification_expiration, QUIC_MIN_RETRYTOKEN_EXPIRATION), + std::clamp(verification_expiration, + QUIC_MIN_RETRYTOKEN_EXPIRATION, + QUIC_MAX_RETRYTOKEN_EXPIRATION), uv_hrtime()); if (ret != 0) return std::nullopt; return std::optional(ocid); @@ -256,7 +259,7 @@ RegularToken::RegularToken(uint32_t version, RegularToken::RegularToken(const uint8_t* token, size_t size) : ptr_(ngtcp2_vec{const_cast(token), size}) { DCHECK_LE(size, RegularToken::kRegularTokenLen); - DCHECK_IMPLIES(token == nullptr, size = 0); + DCHECK_IMPLIES(token == nullptr, size == 0); } RegularToken::operator bool() const { @@ -275,8 +278,9 @@ bool RegularToken::Validate(uint32_t version, TokenSecret::QUIC_TOKENSECRET_LEN, addr.data(), addr.length(), - std::min(verification_expiration, - QUIC_MIN_REGULARTOKEN_EXPIRATION), + std::clamp(verification_expiration, + QUIC_MIN_REGULARTOKEN_EXPIRATION, + QUIC_MAX_REGULARTOKEN_EXPIRATION), uv_hrtime()) == 0; } diff --git a/src/quic/tokens.h b/src/quic/tokens.h index 5949cd58640d15..cfbaa94e344f8d 100644 --- a/src/quic/tokens.h +++ b/src/quic/tokens.h @@ -161,6 +161,8 @@ class RetryToken final : public MemoryRetainer { static constexpr uint64_t QUIC_DEFAULT_RETRYTOKEN_EXPIRATION = 10 * NGTCP2_SECONDS; static constexpr uint64_t QUIC_MIN_RETRYTOKEN_EXPIRATION = 1 * NGTCP2_SECONDS; + static constexpr uint64_t QUIC_MAX_RETRYTOKEN_EXPIRATION = + 60 * NGTCP2_SECONDS; // Generates a new retry token. RetryToken(uint32_t version, @@ -214,18 +216,20 @@ class RegularToken final : public MemoryRetainer { 10 * NGTCP2_SECONDS; static constexpr uint64_t QUIC_MIN_REGULARTOKEN_EXPIRATION = 1 * NGTCP2_SECONDS; + static constexpr uint64_t QUIC_MAX_REGULARTOKEN_EXPIRATION = + 5 * 60 * NGTCP2_SECONDS; RegularToken(); - // Generates a new retry token. + // Generates a new regular token. RegularToken(uint32_t version, const SocketAddress& address, const TokenSecret& token_secret); - // Wraps the given retry token + // Wraps the given regular token RegularToken(const uint8_t* token, size_t length); - // Validates the retry token given the input. + // Validates the regular token given the input. bool Validate( uint32_t version, const SocketAddress& address, @@ -240,8 +244,8 @@ class RegularToken final : public MemoryRetainer { std::string ToString() const; SET_NO_MEMORY_INFO() - SET_MEMORY_INFO_NAME(RetryToken) - SET_SELF_SIZE(RetryToken) + SET_MEMORY_INFO_NAME(RegularToken) + SET_SELF_SIZE(RegularToken) private: operator const char*() const; diff --git a/src/quic/transportparams.h b/src/quic/transportparams.h index ff37299c27ddb5..67e1e5deec00fb 100644 --- a/src/quic/transportparams.h +++ b/src/quic/transportparams.h @@ -120,7 +120,8 @@ class TransportParams final { // connection migration. See the QUIC specification for more details on // connection migration. // https://www.rfc-editor.org/rfc/rfc9000.html#section-18.2-4.30.1 - // TODO(@jasnell): We currently do not implement active migration. + // TODO(@jasnell): Active connection migration is not yet implemented. + // This will be revisited in a future update. bool disable_active_migration = true; static const Options kDefault; diff --git a/test/cctest/test_quic_preferredaddress.cc b/test/cctest/test_quic_preferredaddress.cc index 0b7abc423f5abd..a1a1ad2ed007b5 100644 --- a/test/cctest/test_quic_preferredaddress.cc +++ b/test/cctest/test_quic_preferredaddress.cc @@ -48,7 +48,7 @@ TEST(PreferredAddress, Basic) { const auto ipv4 = preferred_address.ipv4().value(); CHECK_EQ(ipv4.family, AF_INET); CHECK_EQ(htons(ipv4.port), 443); - CHECK_EQ(ipv4.address, "123.123.123.123"); + CHECK_EQ(std::string(ipv4.host), "123.123.123.123"); memcpy(&paddr.ipv6, &storage6, sizeof(sockaddr_in6)); paddr.ipv6_present = 1; @@ -57,7 +57,7 @@ TEST(PreferredAddress, Basic) { const auto ipv6 = preferred_address.ipv6().value(); CHECK_EQ(ipv6.family, AF_INET6); CHECK_EQ(htons(ipv6.port), 123); - CHECK_EQ(ipv6.address, "2001:db8::1"); + CHECK_EQ(std::string(ipv6.host), "2001:db8::1"); CHECK_EQ(preferred_address.cid(), cid); } @@ -78,7 +78,7 @@ TEST(PreferredAddress, SetTransportParams) { const auto ipv4_2 = paddr2.ipv4().value(); CHECK_EQ(ipv4_2.family, AF_INET); CHECK_EQ(htons(ipv4_2.port), 443); - CHECK_EQ(ipv4_2.address, "123.123.123.123"); + CHECK_EQ(std::string(ipv4_2.host), "123.123.123.123"); } #endif // OPENSSL_NO_QUIC #endif // HAVE_OPENSSL && HAVE_QUIC diff --git a/test/parallel/test-quic-internal-endpoint-stats-state.mjs b/test/parallel/test-quic-internal-endpoint-stats-state.mjs index 52ed8ba5275301..94b8167c2d751a 100644 --- a/test/parallel/test-quic-internal-endpoint-stats-state.mjs +++ b/test/parallel/test-quic-internal-endpoint-stats-state.mjs @@ -138,7 +138,6 @@ assert.strictEqual(streamState.finSent, false); assert.strictEqual(streamState.finReceived, false); assert.strictEqual(streamState.readEnded, false); assert.strictEqual(streamState.writeEnded, false); -assert.strictEqual(streamState.paused, false); assert.strictEqual(streamState.reset, false); assert.strictEqual(streamState.hasReader, false); assert.strictEqual(streamState.wantsBlock, false); diff --git a/test/parallel/test-quic-internal-setcallbacks.mjs b/test/parallel/test-quic-internal-setcallbacks.mjs index 88fb2624dd33f6..cebbee43376d6e 100644 --- a/test/parallel/test-quic-internal-setcallbacks.mjs +++ b/test/parallel/test-quic-internal-setcallbacks.mjs @@ -18,6 +18,7 @@ const callbacks = { onSessionHandshake() {}, onSessionPathValidation() {}, onSessionTicket() {}, + onSessionNewToken() {}, onSessionVersionNegotiation() {}, onStreamCreated() {}, onStreamBlocked() {},