🛈 Note: This is pre-release documentation for the upcoming tracing 0.2.0 ecosystem.

For the release documentation, please see docs.rs, instead.

tracing_futures/
lib.rs

1//! Futures compatibility for [`tracing`].
2//!
3//! # Overview
4//!
5//! [`tracing`] is a framework for instrumenting Rust programs to collect
6//! structured, event-based diagnostic information. This crate provides utilities
7//! for using `tracing` to instrument asynchronous code written using futures and
8//! async/await.
9//!
10//! The crate provides the following traits:
11//!
12//! * [`Instrument`] allows a `tracing` [span] to be attached to a future, sink,
13//!   stream, or executor.
14//!
15//! * [`WithCollector`] allows a `tracing` [collector] to be attached to a
16//!   future, sink, stream, or executor.
17//!
18//! *Compiler support: [requires `rustc` 1.63+][msrv]*
19//!
20//! [msrv]: #supported-rust-versions
21//!
22//! # Feature flags
23//!
24//! This crate provides a number of feature flags that enable compatibility
25//! features with other crates in the asynchronous ecosystem:
26//!
27//! - `tokio`: Enables compatibility with the `tokio` 0.1 crate, including
28//!   [`Instrument`] and [`WithCollector`] implementations for
29//!   `tokio::executor::Executor`, `tokio::runtime::Runtime`, and
30//!   `tokio::runtime::current_thread`. This is not needed for compatibility
31//!   with `tokio` v1.
32//! - `tokio-executor`: Enables compatibility with the `tokio-executor`
33//!   crate, including [`Instrument`] and [`WithCollector`]
34//!   implementations for types implementing `tokio_executor::Executor`.
35//!   This is intended primarily for use in crates which depend on
36//!   `tokio-executor` rather than `tokio`; in general the `tokio` feature
37//!   should be used instead.
38//! - `std-future`: Enables compatibility with `std::future::Future`.
39//! - `futures-01`: Enables compatibility with version 0.1.x of the [`futures`]
40//!   crate.
41//! - `futures-03`: Enables compatibility with version 0.3.x of the `futures`
42//!   crate's `Spawn` and `LocalSpawn` traits.
43//! - `std`: Depend on the Rust standard library.
44//!
45//!   `no_std` users may disable this feature with `default-features = false`:
46//!
47//!   ```toml
48//!   [dependencies]
49//!   tracing-futures = { version = "0.2.3", default-features = false }
50//!   ```
51//!
52//! The `std-future` and `std` features are enabled by default.
53//!
54//! [`tracing`]: https://crates.io/crates/tracing
55//! [span]: mod@tracing::span
56//! [collector]: tracing::collect
57//! [`futures`]: https://crates.io/crates/futures
58//!
59//! ## Supported Rust Versions
60//!
61//! Tracing is built against the latest stable release. The minimum supported
62//! version is 1.63. The current Tracing version is not guaranteed to build on
63//! Rust versions earlier than the minimum supported version.
64//!
65//! Tracing follows the same compiler support policies as the rest of the Tokio
66//! project. The current stable Rust compiler and the three most recent minor
67//! versions before it will always be supported. For example, if the current
68//! stable compiler version is 1.69, the minimum supported version will not be
69//! increased past 1.66, three minor versions prior. Increasing the minimum
70//! supported compiler version is not considered a semver breaking change as
71//! long as doing so complies with this policy.
72//!
73#![doc(
74    html_logo_url = "https://raw.githubusercontent.com/tokio-rs/tracing/master/assets/logo-type.png",
75    html_favicon_url = "https://raw.githubusercontent.com/tokio-rs/tracing/master/assets/favicon.ico",
76    issue_tracker_base_url = "https://github.com/tokio-rs/tracing/issues/"
77)]
78#![warn(
79    missing_debug_implementations,
80    missing_docs,
81    rust_2018_idioms,
82    unreachable_pub,
83    bad_style,
84    dead_code,
85    improper_ctypes,
86    non_shorthand_field_patterns,
87    no_mangle_generic_items,
88    overflowing_literals,
89    path_statements,
90    patterns_in_fns_without_body,
91    private_interfaces,
92    private_bounds,
93    unconditional_recursion,
94    unused,
95    unused_allocation,
96    unused_comparisons,
97    unused_parens,
98    while_true
99)]
100#![cfg_attr(not(feature = "std"), no_std)]
101#![cfg_attr(docsrs, feature(doc_cfg))]
102#[cfg(feature = "std-future")]
103use pin_project_lite::pin_project;
104
105#[cfg(feature = "std-future")]
106use core::{
107    mem::{self, ManuallyDrop},
108    pin::Pin,
109    task::Context,
110};
111
112#[cfg(feature = "std")]
113use tracing::{dispatch, Dispatch};
114
115use tracing::Span;
116
117/// Implementations for `Instrument`ed future executors.
118pub mod executor;
119
120/// Extension trait allowing futures, streams, sinks, and executors to be
121/// instrumented with a `tracing` [span].
122///
123/// [span]: mod@tracing::span
124pub trait Instrument: Sized {
125    /// Instruments this type with the provided [`Span`], returning an
126    /// [`Instrumented`] wrapper.
127    ///
128    /// If the instrumented type is a future, stream, or sink, the attached
129    /// [`Span`] will be [entered] every time it is polled or [`Drop`]ped. If
130    /// the instrumented type is a future executor, every future spawned on that
131    /// executor will be instrumented by the attached [`Span`].
132    ///
133    /// # Examples
134    ///
135    /// Instrumenting a future:
136    ///
137    // TODO: ignored until async-await is stable...
138    /// ```rust,ignore
139    /// use tracing_futures::Instrument;
140    ///
141    /// # async fn doc() {
142    /// let my_future = async {
143    ///     // ...
144    /// };
145    ///
146    /// my_future
147    ///     .instrument(tracing::info_span!("my_future"))
148    ///     .await
149    /// # }
150    /// ```
151    ///
152    /// [entered]: Span::enter()
153    fn instrument(self, span: Span) -> Instrumented<Self> {
154        #[cfg(feature = "std-future")]
155        let inner = ManuallyDrop::new(self);
156        #[cfg(not(feature = "std-future"))]
157        let inner = self;
158        Instrumented { inner, span }
159    }
160
161    /// Instruments this type with the [current] [`Span`], returning an
162    /// [`Instrumented`] wrapper.
163    ///
164    /// If the instrumented type is a future, stream, or sink, the attached
165    /// [`Span`] will be [entered] every time it is polled or [`Drop`]ped. If
166    /// the instrumented type is a future executor, every future spawned on that
167    /// executor will be instrumented by the attached [`Span`].
168    ///
169    /// This can be used to propagate the current span when spawning a new future.
170    ///
171    /// # Examples
172    ///
173    // TODO: ignored until async-await is stable...
174    /// ```rust,ignore
175    /// use tracing_futures::Instrument;
176    ///
177    /// # async fn doc() {
178    /// let span = tracing::info_span!("my_span");
179    /// let _enter = span.enter();
180    ///
181    /// // ...
182    ///
183    /// let future = async {
184    ///     tracing::debug!("this event will occur inside `my_span`");
185    ///     // ...
186    /// };
187    /// tokio::spawn(future.in_current_span());
188    /// # }
189    /// ```
190    ///
191    /// [current]: Span::current()
192    /// [entered]: Span::enter()
193    #[inline]
194    fn in_current_span(self) -> Instrumented<Self> {
195        self.instrument(Span::current())
196    }
197}
198
199/// Extension trait allowing futures, streams, and sinks to be instrumented with
200/// a `tracing` [collector].
201///
202/// [collector]: tracing::collect::Collect
203#[cfg(feature = "std")]
204#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
205pub trait WithCollector: Sized {
206    /// Attaches the provided [collector] to this type, returning a
207    /// `WithDispatch` wrapper.
208    ///
209    /// When the wrapped type is a future, stream, or sink, the attached
210    /// subscriber will be set as the [default] while it is being polled.
211    /// When the wrapped type is an executor, the subscriber will be set as the
212    /// default for any futures spawned on that executor.
213    ///
214    /// [collector]: tracing::collect::Collect
215    /// [default]: tracing::dispatch#setting-the-default-collector
216    fn with_collector<C>(self, collector: C) -> WithDispatch<Self>
217    where
218        C: Into<Dispatch>,
219    {
220        WithDispatch {
221            inner: self,
222            dispatch: collector.into(),
223        }
224    }
225
226    /// Attaches the current [default] [collector] to this type, returning a
227    /// `WithDispatch` wrapper.
228    ///
229    /// When the wrapped type is a future, stream, or sink, the attached
230    /// subscriber will be set as the [default] while it is being polled.
231    /// When the wrapped type is an executor, the subscriber will be set as the
232    /// default for any futures spawned on that executor.
233    ///
234    /// This can be used to propagate the current dispatcher context when
235    /// spawning a new future.
236    ///
237    /// [collector]: tracing::collect::Collect
238    /// [default]: tracing::dispatch#setting-the-default-collector
239    #[inline]
240    fn with_current_collector(self) -> WithDispatch<Self> {
241        WithDispatch {
242            inner: self,
243            dispatch: dispatch::get_default(|default| default.clone()),
244        }
245    }
246}
247
248#[cfg(feature = "std-future")]
249pin_project! {
250    /// A future, stream, sink, or executor that has been instrumented with a `tracing` span.
251    #[project = InstrumentedProj]
252    #[project_ref = InstrumentedProjRef]
253    #[derive(Debug, Clone)]
254    pub struct Instrumented<T> {
255        // `ManuallyDrop` is used here to to enter instrument `Drop` by entering
256        // `Span` and executing `ManuallyDrop::drop`.
257        #[pin]
258        inner: ManuallyDrop<T>,
259        span: Span,
260    }
261
262    impl<T> PinnedDrop for Instrumented<T> {
263        fn drop(this: Pin<&mut Self>) {
264            let this = this.project();
265            let _enter = this.span.enter();
266            // SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
267            //             different from wrapping `T` in `Option` and calling
268            //             `Pin::set(&mut this.inner, None)`, except avoiding
269            //             additional memory overhead.
270            //         2. `ManuallyDrop::drop()` is safe, because
271            //            `PinnedDrop::drop()` is guaranteed to be called only
272            //            once.
273            unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
274        }
275    }
276}
277
278#[cfg(feature = "std-future")]
279impl<'a, T> InstrumentedProj<'a, T> {
280    /// Get a mutable reference to the [`Span`] a pinned mutable reference to
281    /// the wrapped type.
282    fn span_and_inner_pin_mut(self) -> (&'a mut Span, Pin<&'a mut T>) {
283        // SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
284        //         and `inner` is valid, because `ManuallyDrop::drop` is called
285        //         only inside `Drop` of the `Instrumented`.
286        let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
287        (self.span, inner)
288    }
289}
290
291#[cfg(feature = "std-future")]
292impl<'a, T> InstrumentedProjRef<'a, T> {
293    /// Get a reference to the [`Span`] a pinned reference to the wrapped type.
294    fn span_and_inner_pin_ref(self) -> (&'a Span, Pin<&'a T>) {
295        // SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
296        //         and `inner` is valid, because `ManuallyDrop::drop` is called
297        //         only inside `Drop` of the `Instrumented`.
298        let inner = unsafe { self.inner.map_unchecked(|v| &**v) };
299        (self.span, inner)
300    }
301}
302
303/// A future, stream, sink, or executor that has been instrumented with a `tracing` span.
304#[cfg(not(feature = "std-future"))]
305#[derive(Debug, Clone)]
306pub struct Instrumented<T> {
307    inner: T,
308    span: Span,
309}
310
311#[cfg(all(feature = "std", feature = "std-future"))]
312pin_project! {
313    /// A future, stream, sink, or executor that has been instrumented with a
314    /// `tracing` subscriber.
315    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
316    #[derive(Clone, Debug)]
317    pub struct WithDispatch<T> {
318        #[pin]
319        inner: T,
320        dispatch: Dispatch,
321    }
322}
323
324/// A future, stream, sink, or executor that has been instrumented with a
325/// `tracing` subscriber.
326#[cfg(all(feature = "std", not(feature = "std-future")))]
327#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
328#[derive(Clone, Debug)]
329pub struct WithDispatch<T> {
330    inner: T,
331    dispatch: Dispatch,
332}
333
334impl<T: Sized> Instrument for T {}
335
336#[cfg(feature = "std-future")]
337#[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
338impl<T: core::future::Future> core::future::Future for Instrumented<T> {
339    type Output = T::Output;
340
341    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> core::task::Poll<Self::Output> {
342        let (span, inner) = self.project().span_and_inner_pin_mut();
343        let _enter = span.enter();
344        inner.poll(cx)
345    }
346}
347
348#[cfg(feature = "futures-01")]
349#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
350impl<T: futures_01::Future> futures_01::Future for Instrumented<T> {
351    type Item = T::Item;
352    type Error = T::Error;
353
354    fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
355        let _enter = self.span.enter();
356        self.inner.poll()
357    }
358}
359
360#[cfg(feature = "futures-01")]
361#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
362impl<T: futures_01::Stream> futures_01::Stream for Instrumented<T> {
363    type Item = T::Item;
364    type Error = T::Error;
365
366    fn poll(&mut self) -> futures_01::Poll<Option<Self::Item>, Self::Error> {
367        let _enter = self.span.enter();
368        self.inner.poll()
369    }
370}
371
372#[cfg(feature = "futures-01")]
373#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
374impl<T: futures_01::Sink> futures_01::Sink for Instrumented<T> {
375    type SinkItem = T::SinkItem;
376    type SinkError = T::SinkError;
377
378    fn start_send(
379        &mut self,
380        item: Self::SinkItem,
381    ) -> futures_01::StartSend<Self::SinkItem, Self::SinkError> {
382        let _enter = self.span.enter();
383        self.inner.start_send(item)
384    }
385
386    fn poll_complete(&mut self) -> futures_01::Poll<(), Self::SinkError> {
387        let _enter = self.span.enter();
388        self.inner.poll_complete()
389    }
390}
391
392#[cfg(all(feature = "futures-03", feature = "std-future"))]
393#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
394impl<T: futures::Stream> futures::Stream for Instrumented<T> {
395    type Item = T::Item;
396
397    fn poll_next(
398        self: Pin<&mut Self>,
399        cx: &mut Context<'_>,
400    ) -> futures::task::Poll<Option<Self::Item>> {
401        let (span, inner) = self.project().span_and_inner_pin_mut();
402        let _enter = span.enter();
403        T::poll_next(inner, cx)
404    }
405}
406
407#[cfg(all(feature = "futures-03", feature = "std-future"))]
408#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
409impl<I, T: futures::Sink<I>> futures::Sink<I> for Instrumented<T>
410where
411    T: futures::Sink<I>,
412{
413    type Error = T::Error;
414
415    fn poll_ready(
416        self: Pin<&mut Self>,
417        cx: &mut Context<'_>,
418    ) -> futures::task::Poll<Result<(), Self::Error>> {
419        let (span, inner) = self.project().span_and_inner_pin_mut();
420        let _enter = span.enter();
421        T::poll_ready(inner, cx)
422    }
423
424    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
425        let (span, inner) = self.project().span_and_inner_pin_mut();
426        let _enter = span.enter();
427        T::start_send(inner, item)
428    }
429
430    fn poll_flush(
431        self: Pin<&mut Self>,
432        cx: &mut Context<'_>,
433    ) -> futures::task::Poll<Result<(), Self::Error>> {
434        let (span, inner) = self.project().span_and_inner_pin_mut();
435        let _enter = span.enter();
436        T::poll_flush(inner, cx)
437    }
438
439    fn poll_close(
440        self: Pin<&mut Self>,
441        cx: &mut Context<'_>,
442    ) -> futures::task::Poll<Result<(), Self::Error>> {
443        let (span, inner) = self.project().span_and_inner_pin_mut();
444        let _enter = span.enter();
445        T::poll_close(inner, cx)
446    }
447}
448
449impl<T> Instrumented<T> {
450    /// Borrows the `Span` that this type is instrumented by.
451    pub fn span(&self) -> &Span {
452        &self.span
453    }
454
455    /// Mutably borrows the `Span` that this type is instrumented by.
456    pub fn span_mut(&mut self) -> &mut Span {
457        &mut self.span
458    }
459
460    /// Borrows the wrapped type.
461    pub fn inner(&self) -> &T {
462        &self.inner
463    }
464
465    /// Mutably borrows the wrapped type.
466    pub fn inner_mut(&mut self) -> &mut T {
467        &mut self.inner
468    }
469
470    /// Get a pinned reference to the wrapped type.
471    #[cfg(feature = "std-future")]
472    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
473    pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
474        self.project_ref().span_and_inner_pin_ref().1
475    }
476
477    /// Get a pinned mutable reference to the wrapped type.
478    #[cfg(feature = "std-future")]
479    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
480    pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
481        self.project().span_and_inner_pin_mut().1
482    }
483
484    /// Consumes the `Instrumented`, returning the wrapped type.
485    ///
486    /// Note that this drops the span.
487    pub fn into_inner(self) -> T {
488        #[cfg(feature = "std-future")]
489        {
490            // To manually destructure `Instrumented` without `Drop`, we save
491            // pointers to the fields and use `mem::forget` to leave those pointers
492            // valid.
493            let span: *const Span = &self.span;
494            let inner: *const ManuallyDrop<T> = &self.inner;
495            mem::forget(self);
496            // SAFETY: Those pointers are valid for reads, because `Drop` didn't
497            //         run, and properly aligned, because `Instrumented` isn't
498            //         `#[repr(packed)]`.
499            let _span = unsafe { span.read() };
500            let inner = unsafe { inner.read() };
501            ManuallyDrop::into_inner(inner)
502        }
503        #[cfg(not(feature = "std-future"))]
504        self.inner
505    }
506}
507
508#[cfg(feature = "std")]
509impl<T: Sized> WithCollector for T {}
510
511#[cfg(all(feature = "futures-01", feature = "std"))]
512#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-01", feature = "std"))))]
513impl<T: futures_01::Future> futures_01::Future for WithDispatch<T> {
514    type Item = T::Item;
515    type Error = T::Error;
516
517    fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
518        let inner = &mut self.inner;
519        dispatch::with_default(&self.dispatch, || inner.poll())
520    }
521}
522
523#[cfg(all(feature = "std-future", feature = "std"))]
524#[cfg_attr(docsrs, doc(cfg(all(feature = "std-future", feature = "std"))))]
525impl<T: core::future::Future> core::future::Future for WithDispatch<T> {
526    type Output = T::Output;
527
528    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> core::task::Poll<Self::Output> {
529        let this = self.project();
530        let dispatch = this.dispatch;
531        let future = this.inner;
532        dispatch::with_default(dispatch, || future.poll(cx))
533    }
534}
535
536#[cfg(feature = "std")]
537impl<T> WithDispatch<T> {
538    /// Wrap a future, stream, sink or executor with the same subscriber as this WithDispatch.
539    pub fn with_dispatch<U>(&self, inner: U) -> WithDispatch<U> {
540        WithDispatch {
541            dispatch: self.dispatch.clone(),
542            inner,
543        }
544    }
545
546    /// Borrows the `Dispatch` that this type is instrumented by.
547    pub fn dispatch(&self) -> &Dispatch {
548        &self.dispatch
549    }
550
551    /// Get a pinned reference to the wrapped type.
552    #[cfg(feature = "std-future")]
553    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
554    pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
555        self.project_ref().inner
556    }
557
558    /// Get a pinned mutable reference to the wrapped type.
559    #[cfg(feature = "std-future")]
560    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
561    pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
562        self.project().inner
563    }
564
565    /// Borrows the wrapped type.
566    pub fn inner(&self) -> &T {
567        &self.inner
568    }
569
570    /// Mutably borrows the wrapped type.
571    pub fn inner_mut(&mut self) -> &mut T {
572        &mut self.inner
573    }
574
575    /// Consumes the `WithDispatch`, returning the wrapped type.
576    pub fn into_inner(self) -> T {
577        self.inner
578    }
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use tracing_mock::*;
585
586    #[cfg(feature = "futures-01")]
587    mod futures_01_tests {
588        use futures_01::{future, stream, task, Async, Future, Stream};
589        use tracing::collect::with_default;
590
591        use super::*;
592
593        struct PollN<T, E> {
594            and_return: Option<Result<T, E>>,
595            finish_at: usize,
596            polls: usize,
597        }
598
599        impl PollN<(), ()> {
600            fn new_ok(finish_at: usize) -> Self {
601                Self {
602                    and_return: Some(Ok(())),
603                    finish_at,
604                    polls: 0,
605                }
606            }
607
608            fn new_err(finish_at: usize) -> Self {
609                Self {
610                    and_return: Some(Err(())),
611                    finish_at,
612                    polls: 0,
613                }
614            }
615        }
616
617        impl<T, E> futures_01::Future for PollN<T, E> {
618            type Item = T;
619            type Error = E;
620            fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
621                self.polls += 1;
622                if self.polls == self.finish_at {
623                    self.and_return
624                        .take()
625                        .expect("polled after ready")
626                        .map(Async::Ready)
627                } else {
628                    task::current().notify();
629                    Ok(Async::NotReady)
630                }
631            }
632        }
633
634        #[test]
635        fn future_enter_exit_is_reasonable() {
636            let (collector, handle) = collector::mock()
637                .enter(expect::span().named("foo"))
638                .exit(expect::span().named("foo"))
639                .enter(expect::span().named("foo"))
640                .exit(expect::span().named("foo"))
641                .enter(expect::span().named("foo"))
642                .exit(expect::span().named("foo"))
643                .drop_span(expect::span().named("foo"))
644                .only()
645                .run_with_handle();
646            with_default(collector, || {
647                PollN::new_ok(2)
648                    .instrument(tracing::trace_span!("foo"))
649                    .wait()
650                    .unwrap();
651            });
652            handle.assert_finished();
653        }
654
655        #[test]
656        fn future_error_ends_span() {
657            let (collector, handle) = collector::mock()
658                .enter(expect::span().named("foo"))
659                .exit(expect::span().named("foo"))
660                .enter(expect::span().named("foo"))
661                .exit(expect::span().named("foo"))
662                .enter(expect::span().named("foo"))
663                .exit(expect::span().named("foo"))
664                .drop_span(expect::span().named("foo"))
665                .only()
666                .run_with_handle();
667            with_default(collector, || {
668                PollN::new_err(2)
669                    .instrument(tracing::trace_span!("foo"))
670                    .wait()
671                    .unwrap_err();
672            });
673
674            handle.assert_finished();
675        }
676
677        #[test]
678        fn stream_enter_exit_is_reasonable() {
679            let (collector, handle) = collector::mock()
680                .enter(expect::span().named("foo"))
681                .exit(expect::span().named("foo"))
682                .enter(expect::span().named("foo"))
683                .exit(expect::span().named("foo"))
684                .enter(expect::span().named("foo"))
685                .exit(expect::span().named("foo"))
686                .enter(expect::span().named("foo"))
687                .exit(expect::span().named("foo"))
688                .enter(expect::span().named("foo"))
689                .exit(expect::span().named("foo"))
690                .drop_span(expect::span().named("foo"))
691                .run_with_handle();
692            with_default(collector, || {
693                stream::iter_ok::<_, ()>(&[1, 2, 3])
694                    .instrument(tracing::trace_span!("foo"))
695                    .for_each(|_| future::ok(()))
696                    .wait()
697                    .unwrap();
698            });
699            handle.assert_finished();
700        }
701
702        // #[test]
703        // fn span_follows_future_onto_threadpool() {
704        //     let (collector, handle) = collector::mock()
705        //         .enter(span::mock().named("a"))
706        //         .enter(span::mock().named("b"))
707        //         .exit(span::mock().named("b"))
708        //         .enter(span::mock().named("b"))
709        //         .exit(span::mock().named("b"))
710        //         .drop_span(span::mock().named("b"))
711        //         .exit(span::mock().named("a"))
712        //         .drop_span(span::mock().named("a"))
713        //         .only()
714        //         .run_with_handle();
715        //     let mut runtime = tokio::runtime::Runtime::new().unwrap();
716        //     with_default(collector, || {
717        //         tracing::trace_span!("a").in_scope(|| {
718        //             let future = PollN::new_ok(2)
719        //                 .instrument(tracing::trace_span!("b"))
720        //                 .map(|_| {
721        //                     tracing::trace_span!("c").in_scope(|| {
722        //                         // "c" happens _outside_ of the instrumented future's
723        //                         // span, so we don't expect it.
724        //                     })
725        //                 });
726        //             runtime.block_on(Box::new(future)).unwrap();
727        //         })
728        //     });
729        //     handle.assert_finished();
730        // }
731    }
732
733    #[cfg(all(feature = "futures-03", feature = "std-future"))]
734    mod futures_03_tests {
735        use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt};
736        use tracing::collect::with_default;
737
738        use super::*;
739
740        #[test]
741        fn stream_enter_exit_is_reasonable() {
742            let (collector, handle) = collector::mock()
743                .enter(expect::span().named("foo"))
744                .exit(expect::span().named("foo"))
745                .enter(expect::span().named("foo"))
746                .exit(expect::span().named("foo"))
747                .enter(expect::span().named("foo"))
748                .exit(expect::span().named("foo"))
749                .enter(expect::span().named("foo"))
750                .exit(expect::span().named("foo"))
751                .drop_span(expect::span().named("foo"))
752                .run_with_handle();
753            with_default(collector, || {
754                Instrument::instrument(stream::iter(&[1, 2, 3]), tracing::trace_span!("foo"))
755                    .for_each(|_| future::ready(()))
756                    .now_or_never()
757                    .unwrap();
758            });
759            handle.assert_finished();
760        }
761
762        #[test]
763        fn sink_enter_exit_is_reasonable() {
764            let (collector, handle) = collector::mock()
765                .enter(expect::span().named("foo"))
766                .exit(expect::span().named("foo"))
767                .enter(expect::span().named("foo"))
768                .exit(expect::span().named("foo"))
769                .enter(expect::span().named("foo"))
770                .exit(expect::span().named("foo"))
771                .drop_span(expect::span().named("foo"))
772                .run_with_handle();
773            with_default(collector, || {
774                Instrument::instrument(sink::drain(), tracing::trace_span!("foo"))
775                    .send(1u8)
776                    .now_or_never()
777                    .unwrap()
778                    .unwrap()
779            });
780            handle.assert_finished();
781        }
782    }
783}