Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 63 additions & 50 deletions lib/internal/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -439,63 +439,76 @@ function createBlobReaderStream(reader) {
// There really should only be one read at a time so using an
// array here is purely defensive.
this.pendingPulls = [];
// Register a wakeup callback that the C++ side can invoke
// when new data is available after a STATUS_BLOCK.
reader.setWakeup(() => {
if (this.pendingPulls.length > 0) {
this.readNext(c);
}
});
},
pull(c) {
const { promise, resolve, reject } = PromiseWithResolvers();
this.pendingPulls.push({ resolve, reject });
const readNext = () => {
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
// We can simply exit.
if (this.pendingPulls.length === 0) {
return;
}
if (status === 0) {
// EOS
c.close();
// This is to signal the end for byob readers
// see https://streams.spec.whatwg.org/#example-rbs-pull
c.byobRequest?.respond(0);
const pending = this.pendingPulls.shift();
pending.resolve();
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
const pending = this.pendingPulls.shift();
c.error(error);
pending.reject(error);
this.readNext(c);
return promise;
},
readNext(c) {
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
// We can simply exit.
if (this.pendingPulls.length === 0) {
return;
}
if (status === 0) {
// EOS
c.close();
// This is to signal the end for byob readers
// see https://streams.spec.whatwg.org/#example-rbs-pull
c.byobRequest?.respond(0);
const pending = this.pendingPulls.shift();
pending.resolve();
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error =
lazyDOMException('The blob could not be read',
'NotReadableError');
const pending = this.pendingPulls.shift();
c.error(error);
pending.reject(error);
return;
} else if (status === 2) {
// STATUS_BLOCK: No data available yet. The wakeup callback
// registered in start() will re-invoke readNext when data
// arrives.
return;
}
// ReadableByteStreamController.enqueue errors if we submit a
// 0-length buffer. We need to check for that here.
if (buffer !== undefined && buffer.byteLength !== 0) {
c.enqueue(new Uint8Array(buffer));
}
// We keep reading until we either reach EOS, some error, or
// we hit the flow rate of the stream (c.desiredSize).
// We use setImmediate here because we have to allow the event
// loop to turn in order to process any pending i/o. Using
// queueMicrotask won't allow the event loop to turn.
setImmediate(() => {
if (c.desiredSize < 0) {
// A manual backpressure check.
if (this.pendingPulls.length !== 0) {
const pending = this.pendingPulls.shift();
pending.resolve();
}
return;
}
// ReadableByteStreamController.enqueue errors if we submit a 0-length
// buffer. We need to check for that here.
if (buffer !== undefined && buffer.byteLength !== 0) {
c.enqueue(new Uint8Array(buffer));
}
// We keep reading until we either reach EOS, some error, or we
// hit the flow rate of the stream (c.desiredSize).
// We use set immediate here because we have to allow the event
// loop to turn in order to process any pending i/o. Using
// queueMicrotask won't allow the event loop to turn.
setImmediate(() => {
if (c.desiredSize < 0) {
// A manual backpressure check.
if (this.pendingPulls.length !== 0) {
// A case of waiting pull finished (= not yet canceled)
const pending = this.pendingPulls.shift();
pending.resolve();
}
return;
}
readNext();
});
this.readNext(c);
});
};
readNext();
return promise;
});
},
cancel(reason) {
// Reject any currently pending pulls here.
Expand Down
12 changes: 12 additions & 0 deletions lib/internal/quic/quic.js
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,18 @@ setCallbacks({
this[kOwner][kSessionTicket](ticket);
},

/**
* Called when the client receives a NEW_TOKEN frame from the server.
* The token can be used for future connections to the same server
* address to skip address validation.
* @param {Buffer} token The opaque token data
* @param {SocketAddress} address The remote server address
*/
onSessionNewToken(token, address) {
debug('session new token callback', this[kOwner]);
// TODO(@jasnell): Emit to JS for storage and future reconnection use
},

/**
* Called when the session receives a session version negotiation request
* @param {*} version
Expand Down
9 changes: 0 additions & 9 deletions lib/internal/quic/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ const {
IDX_STATE_STREAM_FIN_RECEIVED,
IDX_STATE_STREAM_READ_ENDED,
IDX_STATE_STREAM_WRITE_ENDED,
IDX_STATE_STREAM_PAUSED,
IDX_STATE_STREAM_RESET,
IDX_STATE_STREAM_HAS_OUTBOUND,
IDX_STATE_STREAM_HAS_READER,
Expand Down Expand Up @@ -113,7 +112,6 @@ assert(IDX_STATE_STREAM_FIN_SENT !== undefined);
assert(IDX_STATE_STREAM_FIN_RECEIVED !== undefined);
assert(IDX_STATE_STREAM_READ_ENDED !== undefined);
assert(IDX_STATE_STREAM_WRITE_ENDED !== undefined);
assert(IDX_STATE_STREAM_PAUSED !== undefined);
assert(IDX_STATE_STREAM_RESET !== undefined);
assert(IDX_STATE_STREAM_HAS_OUTBOUND !== undefined);
assert(IDX_STATE_STREAM_HAS_READER !== undefined);
Expand Down Expand Up @@ -475,11 +473,6 @@ class QuicStreamState {
return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_WRITE_ENDED);
}

/** @type {boolean} */
get paused() {
if (this.#handle.byteLength === 0) return undefined;
return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_PAUSED);
}

/** @type {boolean} */
get reset() {
Expand Down Expand Up @@ -561,7 +554,6 @@ class QuicStreamState {
finReceived: this.finReceived,
readEnded: this.readEnded,
writeEnded: this.writeEnded,
paused: this.paused,
reset: this.reset,
hasOutbound: this.hasOutbound,
hasReader: this.hasReader,
Expand Down Expand Up @@ -590,7 +582,6 @@ class QuicStreamState {
finReceived: this.finReceived,
readEnded: this.readEnded,
writeEnded: this.writeEnded,
paused: this.paused,
reset: this.reset,
hasOutbound: this.hasOutbound,
hasReader: this.hasReader,
Expand Down
17 changes: 17 additions & 0 deletions src/node_blob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ Local<FunctionTemplate> Blob::Reader::GetConstructorTemplate(Environment* env) {
BaseObject::kInternalFieldCount);
tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader"));
SetProtoMethod(env->isolate(), tmpl, "pull", Pull);
SetProtoMethod(env->isolate(), tmpl, "setWakeup", SetWakeup);
env->set_blob_reader_constructor_template(tmpl);
}
return tmpl;
Expand Down Expand Up @@ -411,6 +412,21 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value>& args) {
std::move(next), node::bob::OPTIONS_END, nullptr, 0));
}

void Blob::Reader::SetWakeup(
const FunctionCallbackInfo<Value>& args) {
Blob::Reader* reader;
ASSIGN_OR_RETURN_UNWRAP(&reader, args.This());
CHECK(args[0]->IsFunction());
reader->wakeup_.Reset(args.GetIsolate(), args[0].As<Function>());
}

void Blob::Reader::NotifyPull() {
if (wakeup_.IsEmpty() || !env()->can_call_into_js()) return;
HandleScope handle_scope(env()->isolate());
Local<Function> fn = wakeup_.Get(env()->isolate());
MakeCallback(fn, 0, nullptr);
}

BaseObjectPtr<BaseObject>
Blob::BlobTransferData::Deserialize(
Environment* env,
Expand Down Expand Up @@ -591,6 +607,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Blob::GetDataObject);
registry->Register(Blob::RevokeObjectURL);
registry->Register(Blob::Reader::Pull);
registry->Register(Blob::Reader::SetWakeup);
registry->Register(Concat);
registry->Register(BlobFromFilePath);
}
Expand Down
3 changes: 3 additions & 0 deletions src/node_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Blob : public BaseObject {
static BaseObjectPtr<Reader> Create(Environment* env,
BaseObjectPtr<Blob> blob);
static void Pull(const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetWakeup(const v8::FunctionCallbackInfo<v8::Value>& args);
void NotifyPull();

explicit Reader(Environment* env,
v8::Local<v8::Object> obj,
Expand All @@ -95,6 +97,7 @@ class Blob : public BaseObject {
std::shared_ptr<DataQueue::Reader> inner_;
BaseObjectPtr<Blob> strong_ptr_;
bool eos_ = false;
v8::Global<v8::Function> wakeup_;
};

BaseObject::TransferMode GetTransferMode() const override;
Expand Down
25 changes: 6 additions & 19 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ ssize_t Session::Application::WriteVStream(PathStorage* path,
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
return ngtcp2_conn_writev_stream(*session_,
&path->path,
// TODO(@jasnell): ECN blocked on libuv
nullptr,
dest,
max_packet_size,
Expand Down Expand Up @@ -511,10 +512,13 @@ class DefaultApplication final : public Session::Application {
stream_data->count = 0;
stream_data->fin = 0;
stream_data->stream.reset();
stream_data->remaining = 0;
Debug(&session(), "Default application getting stream data");
DCHECK_NOT_NULL(stream_data);
// If the queue is empty, there aren't any streams with data yet

// If the connection-level flow control window is exhausted,
// there is no point in pulling stream data.
if (!session().max_data_left()) return 0;
if (stream_queue_.IsEmpty()) return 0;

const auto get_length = [](auto vec, size_t count) {
Expand Down Expand Up @@ -554,9 +558,7 @@ class DefaultApplication final : public Session::Application {

if (count > 0) {
stream->Schedule(&stream_queue_);
stream_data->remaining = get_length(data, count);
} else {
stream_data->remaining = 0;
}

// Not calling done here because we defer committing
Expand All @@ -581,15 +583,6 @@ class DefaultApplication final : public Session::Application {

void ResumeStream(int64_t id) override { ScheduleStream(id); }

bool ShouldSetFin(const StreamData& stream_data) override {
auto const is_empty = [](const ngtcp2_vec* vec, size_t cnt) {
size_t i = 0;
for (size_t n = 0; n < cnt; n++) i += vec[n].len;
return i > 0;
};

return stream_data.stream && is_empty(stream_data, stream_data.count);
}

void BlockStream(int64_t id) override {
if (auto stream = session().FindStream(id)) [[likely]] {
Expand All @@ -598,10 +591,9 @@ class DefaultApplication final : public Session::Application {
}

bool StreamCommit(StreamData* stream_data, size_t datalen) override {
if (datalen == 0) return true;
DCHECK_NOT_NULL(stream_data);
CHECK(stream_data->stream);
stream_data->stream->Commit(datalen);
stream_data->stream->Commit(datalen, stream_data->fin);
return true;
}

Expand All @@ -616,11 +608,6 @@ class DefaultApplication final : public Session::Application {
}
}

void UnscheduleStream(int64_t id) {
if (auto stream = session().FindStream(id)) [[likely]] {
stream->Unschedule();
}
}

Stream::Queue stream_queue_;
};
Expand Down
7 changes: 5 additions & 2 deletions src/quic/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ class Session::Application : public MemoryRetainer {

virtual int GetStreamData(StreamData* data) = 0;
virtual bool StreamCommit(StreamData* data, size_t datalen) = 0;
virtual bool ShouldSetFin(const StreamData& data) = 0;

inline Environment* env() const { return session().env(); }
inline Session& session() {
Expand Down Expand Up @@ -148,14 +147,18 @@ class Session::Application : public MemoryRetainer {
struct Session::Application::StreamData final {
// The actual number of vectors in the struct, up to kMaxVectorCount.
size_t count = 0;
size_t remaining = 0;
// The stream identifier. If this is a negative value then no stream is
// identified.
int64_t id = -1;
int fin = 0;
ngtcp2_vec data[kMaxVectorCount]{};
BaseObjectPtr<Stream> stream;

static_assert(sizeof(ngtcp2_vec) == sizeof(nghttp3_vec) &&
alignof(ngtcp2_vec) == alignof(nghttp3_vec) &&
offsetof(ngtcp2_vec, base) == offsetof(nghttp3_vec, base) &&
offsetof(ngtcp2_vec, len) == offsetof(nghttp3_vec, len),
"ngtcp2_vec and nghttp3_vec must have identical layout");
inline operator nghttp3_vec*() {
return reinterpret_cast<nghttp3_vec*>(data);
}
Expand Down
3 changes: 3 additions & 0 deletions src/quic/bindingdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Packet;
V(session_datagram_status, SessionDatagramStatus) \
V(session_handshake, SessionHandshake) \
V(session_new, SessionNew) \
V(session_new_token, SessionNewToken) \
V(session_path_validation, SessionPathValidation) \
V(session_ticket, SessionTicket) \
V(session_version_negotiation, SessionVersionNegotiation) \
Expand Down Expand Up @@ -70,6 +71,7 @@ class Packet;
V(cubic, "cubic") \
V(disable_stateless_reset, "disableStatelessReset") \
V(enable_connect_protocol, "enableConnectProtocol") \
V(enable_early_data, "enableEarlyData") \
V(enable_datagrams, "enableDatagrams") \
V(enable_tls_trace, "tlsTrace") \
V(endpoint, "Endpoint") \
Expand Down Expand Up @@ -121,6 +123,7 @@ class Packet;
V(stream, "Stream") \
V(success, "success") \
V(tls_options, "tls") \
V(token, "token") \
V(token_expiration, "tokenExpiration") \
V(token_secret, "tokenSecret") \
V(transport_params, "transportParams") \
Expand Down
8 changes: 6 additions & 2 deletions src/quic/cid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ const CID CID::kInvalid{};
// CID::Hash

size_t CID::Hash::operator()(const CID& cid) const {
// Uses the Boost hash_combine strategy: XOR each byte with the golden
// ratio constant 0x9e3779b9 (derived from the fractional part of the
// golden ratio, (sqrt(5)-1)/2 * 2^32) plus bit-shifted accumulator
// state. This provides good avalanche properties for short byte
// sequences like connection IDs (1-20 bytes).
size_t hash = 0;
for (size_t n = 0; n < cid.length(); n++) {
hash ^= std::hash<uint8_t>{}(cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) +
(hash >> 2));
hash ^= cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) + (hash >> 2);
}
return hash;
}
Expand Down
2 changes: 1 addition & 1 deletion src/quic/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ using v8::Undefined;
using v8::Value;

namespace quic {
int DebugIndentScope::indent_ = 0;
thread_local int DebugIndentScope::indent_ = 0;

Path::Path(const SocketAddress& local, const SocketAddress& remote) {
ngtcp2_addr_init(&this->local, local.data(), local.length());
Expand Down
Loading
Loading