Skip to content

fix: defer async stream transforms for large payloads#1239

Open
giulio-leone wants to merge 5 commits intoanthropics:mainfrom
giulio-leone:anthropic-1195-fresh
Open

fix: defer async stream transforms for large payloads#1239
giulio-leone wants to merge 5 commits intoanthropics:mainfrom
giulio-leone:anthropic-1195-fresh

Conversation

@giulio-leone
Copy link

Fixes #1195.

Summary

  • keep the existing transform-cache optimizations and make them pass the repo lint/type gate cleanly
  • defer AsyncMessages.stream() request construction so large payload transforms run inside the awaited path instead of synchronously during stream() manager creation
  • apply the same deferral to client.beta.messages.stream()
  • add regression tests proving the async stream helpers do not eagerly call maybe_transform

Why

AsyncMessages.stream() and beta.messages.stream() are synchronous methods that return async context managers. Before this patch, both methods performed expensive request-body transforms up front, so large messages / tools payloads could block the event loop before the first await.

This change moves that work into the coroutine consumed by AsyncMessageStreamManager.__aenter__, which preserves behavior while removing the unexpected synchronous blocking at stream() call time.

Real runtime verification

I validated this against a live local HTTP server using the public SDK APIs, with large real payloads (3000 messages, 400 tools), comparing:

  • upstream main (49d639a)
  • the cache-only optimization branch (0767668)
  • this final branch (0767668 + deferred stream construction)

Median results across two consecutive quality-gate passes:

  • client.messages.stream(...) construction

    • main: ~823-833 ms blocking on the event loop
    • cache-only: ~421-428 ms
    • final branch: ~0.017-0.019 ms
  • client.beta.messages.stream(...) construction

    • main: ~1010-1031 ms
    • cache-only: ~536-539 ms
    • final branch: ~1.106-1.115 ms
  • full async with client.messages.stream(...) time stays in the same general range (~437 ms cache-only vs ~480 ms final), which is expected: the work is deferred into the awaited path rather than silently removed.

Validation

Run twice consecutively on this branch:

  • ./scripts/lint
  • uv run pytest tests/lib/streaming/test_messages.py tests/lib/streaming/test_beta_messages.py tests/lib/_parse/test_beta_messages.py tests/test_transform.py -q
  • live runtime benchmark via local HTTP server using the public async streaming APIs

…edundant dispatch

The _transform_recursive function and its async variant performed type
introspection (strip_annotated_type, get_origin, is_typeddict,
is_list_type, is_union_type, etc.) on every recursive call, even though
the type annotation is the same for all values of a given field. On
large payloads (~90K messages), this consumed ~6.6% of total CPU time
with zero transformation output since Messages API types have no
PropertyInfo annotations.

Changes:
- Add _cached_transform_dispatch(): LRU-cached function that precomputes
  the dispatch path (typeddict/dict/sequence/union/other) and extracts
  type args once per annotation type. Subsequent calls are O(1) dict
  lookups instead of re-running type introspection.
- Add _get_field_key_map(): LRU-cached function that precomputes the
  key alias mapping for each TypedDict type, replacing per-field
  _maybe_transform_key calls with a single dict.get() lookup.
- Expand _no_transform_needed() to include str and bool, allowing
  lists of strings/bools to skip per-element recursion.
- Apply same optimizations to _async_transform_recursive and
  _async_transform_typeddict.

Fixes anthropics#1195
Address review feedback:
1. The dict branch in _async_transform_recursive called the synchronous
   _transform_recursive, defeating async benefits. Changed to
   await _async_transform_recursive.
2. Relaxed wall-clock assertion in performance test from 2s to 10s to
   avoid flakiness in CI environments with variable load.
Use sync-vs-async relative timing instead of a fixed threshold so
the test is resilient to CI environments with variable load while
still catching performance regressions.

Refs: anthropics#1195
Move async stream request construction behind an awaited coroutine so large payload transforms do not block the event loop during stream() manager creation.

Also add regression coverage for eager transform calls and tighten the cached transform typing so the existing transform optimization branch passes the repo lint/type gate cleanly.
@giulio-leone giulio-leone requested a review from a team as a code owner March 11, 2026 22:58
@giulio-leone
Copy link
Author

Quick heads-up: both PR workflows are currently in action_required with zero jobs, so this looks like the standard forked-PR approval gate rather than a failing CI run.

For convenience, I already validated this branch locally twice with:

  • ./scripts/lint
  • uv run pytest tests/lib/streaming/test_messages.py tests/lib/streaming/test_beta_messages.py tests/lib/_parse/test_beta_messages.py tests/test_transform.py -q
  • a live local-server benchmark using the public async streaming APIs

That benchmark showed client.messages.stream(...) construction drop from ~421-428 ms on the cache-only branch to ~0.017-0.019 ms here, and beta stream construction drop from ~536-539 ms to ~1.106-1.115 ms.

@giulio-leone
Copy link
Author

Friendly ping — rebased on latest and ready for review. Happy to address any feedback!

@giulio-leone giulio-leone force-pushed the anthropic-1195-fresh branch from a766bb4 to 11cff01 Compare March 15, 2026 16:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python SDK: _transform_recursive blocking event loop on large message payloads

1 participant