fix: defer async stream transforms for large payloads#1239
Open
giulio-leone wants to merge 5 commits intoanthropics:mainfrom
Open
fix: defer async stream transforms for large payloads#1239giulio-leone wants to merge 5 commits intoanthropics:mainfrom
giulio-leone wants to merge 5 commits intoanthropics:mainfrom
Conversation
…edundant dispatch The _transform_recursive function and its async variant performed type introspection (strip_annotated_type, get_origin, is_typeddict, is_list_type, is_union_type, etc.) on every recursive call, even though the type annotation is the same for all values of a given field. On large payloads (~90K messages), this consumed ~6.6% of total CPU time with zero transformation output since Messages API types have no PropertyInfo annotations. Changes: - Add _cached_transform_dispatch(): LRU-cached function that precomputes the dispatch path (typeddict/dict/sequence/union/other) and extracts type args once per annotation type. Subsequent calls are O(1) dict lookups instead of re-running type introspection. - Add _get_field_key_map(): LRU-cached function that precomputes the key alias mapping for each TypedDict type, replacing per-field _maybe_transform_key calls with a single dict.get() lookup. - Expand _no_transform_needed() to include str and bool, allowing lists of strings/bools to skip per-element recursion. - Apply same optimizations to _async_transform_recursive and _async_transform_typeddict. Fixes anthropics#1195
Address review feedback: 1. The dict branch in _async_transform_recursive called the synchronous _transform_recursive, defeating async benefits. Changed to await _async_transform_recursive. 2. Relaxed wall-clock assertion in performance test from 2s to 10s to avoid flakiness in CI environments with variable load.
Use sync-vs-async relative timing instead of a fixed threshold so the test is resilient to CI environments with variable load while still catching performance regressions. Refs: anthropics#1195
Move async stream request construction behind an awaited coroutine so large payload transforms do not block the event loop during stream() manager creation. Also add regression coverage for eager transform calls and tighten the cached transform typing so the existing transform optimization branch passes the repo lint/type gate cleanly.
Author
|
Quick heads-up: both PR workflows are currently in For convenience, I already validated this branch locally twice with:
That benchmark showed |
Author
|
Friendly ping — rebased on latest and ready for review. Happy to address any feedback! |
a766bb4 to
11cff01
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #1195.
Summary
AsyncMessages.stream()request construction so large payload transforms run inside the awaited path instead of synchronously duringstream()manager creationclient.beta.messages.stream()maybe_transformWhy
AsyncMessages.stream()andbeta.messages.stream()are synchronous methods that return async context managers. Before this patch, both methods performed expensive request-body transforms up front, so largemessages/toolspayloads could block the event loop before the first await.This change moves that work into the coroutine consumed by
AsyncMessageStreamManager.__aenter__, which preserves behavior while removing the unexpected synchronous blocking atstream()call time.Real runtime verification
I validated this against a live local HTTP server using the public SDK APIs, with large real payloads (3000 messages, 400 tools), comparing:
main(49d639a)0767668)0767668+ deferred stream construction)Median results across two consecutive quality-gate passes:
client.messages.stream(...)constructionmain: ~823-833 ms blocking on the event loopclient.beta.messages.stream(...)constructionmain: ~1010-1031 msfull
async with client.messages.stream(...)time stays in the same general range (~437 ms cache-only vs ~480 ms final), which is expected: the work is deferred into the awaited path rather than silently removed.Validation
Run twice consecutively on this branch:
./scripts/lintuv run pytest tests/lib/streaming/test_messages.py tests/lib/streaming/test_beta_messages.py tests/lib/_parse/test_beta_messages.py tests/test_transform.py -q