🛈 Note: This is pre-release documentation for the upcoming tracing 0.2.0 ecosystem.

For the release documentation, please see docs.rs, instead.

tracing_appender/
non_blocking.rs

1//! A non-blocking, off-thread writer.
2//!
3//! This spawns a dedicated worker thread which is responsible for writing log
4//! lines to the provided writer. When a line is written using the returned
5//! `NonBlocking` struct's `make_writer` method, it will be enqueued to be
6//! written by the worker thread.
7//!
8//! The queue has a fixed capacity, and if it becomes full, any logs written
9//! to it will be dropped until capacity is once again available. This may
10//! occur if logs are consistently produced faster than the worker thread can
11//! output them. The queue capacity and behavior when full (i.e., whether to
12//! drop logs or to exert backpressure to slow down senders) can be configured
13//! using [`NonBlockingBuilder::default()`][builder].
14//! This function returns the default configuration. It is equivalent to:
15//!
16//! ```rust
17//! # use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
18//! # fn doc() -> (NonBlocking, WorkerGuard) {
19//! tracing_appender::non_blocking(std::io::stdout())
20//! # }
21//! ```
22//! [builder]: NonBlockingBuilder::default()
23//!
24//! <br/> This function returns a tuple of `NonBlocking` and `WorkerGuard`.
25//! `NonBlocking` implements [`MakeWriter`] which integrates with `tracing_subscriber`.
26//! `WorkerGuard` is a drop guard that is responsible for flushing any remaining logs when
27//! the program terminates.
28//!
29//! Note that the `WorkerGuard` returned by `non_blocking` _must_ be assigned to a binding that
30//! is not `_`, as `_` will result in the `WorkerGuard` being dropped immediately.
31//! Unintentional drops of `WorkerGuard` remove the guarantee that logs will be flushed
32//! during a program's termination, in a panic or otherwise.
33//!
34//! See [`WorkerGuard`] for examples of using the guard.
35//!
36//! # Examples
37//!
38//! ``` rust
39//! # fn docs() {
40//! let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
41//! let collector = tracing_subscriber::fmt().with_writer(non_blocking);
42//! tracing::collect::with_default(collector.finish(), || {
43//!    tracing::event!(tracing::Level::INFO, "Hello");
44//! });
45//! # }
46//! ```
47use crate::worker::Worker;
48use crate::Msg;
49use crossbeam_channel::{bounded, SendTimeoutError, Sender};
50use std::io;
51use std::io::Write;
52use std::sync::atomic::AtomicUsize;
53use std::sync::atomic::Ordering;
54use std::sync::Arc;
55use std::thread::JoinHandle;
56use std::time::Duration;
57use tracing_subscriber::fmt::MakeWriter;
58
59/// The default maximum number of buffered log lines.
60///
61/// If [`NonBlocking`] is lossy, it will drop spans/events at capacity.
62/// If [`NonBlocking`] is _not_ lossy, backpressure will be exerted on
63/// senders, causing them to block their respective threads until there
64/// is available capacity.
65///
66/// Recommended to be a power of 2.
67pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;
68
69/// A guard that flushes spans/events associated to a [`NonBlocking`] on a drop
70///
71/// Writing to a [`NonBlocking`] writer will **not** immediately write a span or event to the underlying
72/// output. Instead, the span or event will be written by a dedicated logging thread at some later point.
73/// To increase throughput, the non-blocking writer will flush to the underlying output on
74/// a periodic basis rather than every time a span or event is written. This means that if the program
75/// terminates abruptly (such as through an uncaught `panic` or a `std::process::exit`), some spans
76/// or events may not be written.
77///
78/// Since spans/events and events recorded near a crash are often necessary for diagnosing the failure,
79/// `WorkerGuard` provides a mechanism to ensure that _all_ buffered logs are flushed to their output.
80/// `WorkerGuard` should be assigned in the `main` function or whatever the entrypoint of the program is.
81/// This will ensure that the guard will be dropped during an unwinding or when `main` exits
82/// successfully.
83///
84/// # Examples
85///
86/// ``` rust
87/// # #[clippy::allow(needless_doctest_main)]
88/// fn main () {
89/// # fn doc() {
90///     let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
91///     let collector = tracing_subscriber::fmt().with_writer(non_blocking);
92///     tracing::collect::with_default(collector.finish(), || {
93///         // Emit some tracing events within context of the non_blocking `_guard` and tracing subscriber
94///         tracing::event!(tracing::Level::INFO, "Hello");
95///     });
96///     // Exiting the context of `main` will drop the `_guard` and any remaining logs should get flushed
97/// # }
98/// }
99/// ```
100#[must_use]
101#[derive(Debug)]
102pub struct WorkerGuard {
103    handle: Option<JoinHandle<()>>,
104    sender: Sender<Msg>,
105    shutdown: Sender<()>,
106}
107
108/// A non-blocking writer.
109///
110/// While the line between "blocking" and "non-blocking" IO is fuzzy, writing to a file is typically
111/// considered to be a _blocking_ operation. For an application whose `Collector` writes spans and events
112/// as they are emitted, an application might find the latency profile to be unacceptable.
113/// `NonBlocking` moves the writing out of an application's data path by sending spans and events
114/// to a dedicated logging thread.
115///
116/// This struct implements [`MakeWriter`] from the `tracing-subscriber`
117/// crate. Therefore, it can be used with the [`tracing_subscriber::fmt`][fmt] module
118/// or with any other collector/subscriber implementation that uses the `MakeWriter` trait.
119///
120/// [fmt]: mod@tracing_subscriber::fmt
121#[derive(Clone, Debug)]
122pub struct NonBlocking {
123    error_counter: ErrorCounter,
124    channel: Sender<Msg>,
125    is_lossy: bool,
126}
127
128/// Tracks the number of times a log line was dropped by the background thread.
129///
130/// If the non-blocking writer is not configured in [lossy mode], the error
131/// count should always be 0.
132///
133/// [lossy mode]: NonBlockingBuilder::lossy
134#[derive(Clone, Debug)]
135pub struct ErrorCounter(Arc<AtomicUsize>);
136
137impl NonBlocking {
138    /// Returns a new `NonBlocking` writer wrapping the provided `writer`.
139    ///
140    /// The returned `NonBlocking` writer will have the [default configuration][default] values.
141    /// Other configurations can be specified using the [builder] interface.
142    ///
143    /// [default]: NonBlockingBuilder::default()
144    /// [builder]: NonBlockingBuilder
145    pub fn new<T: Write + Send + 'static>(writer: T) -> (NonBlocking, WorkerGuard) {
146        NonBlockingBuilder::default().finish(writer)
147    }
148
149    fn create<T: Write + Send + 'static>(
150        writer: T,
151        buffered_lines_limit: usize,
152        is_lossy: bool,
153        thread_name: String,
154    ) -> (NonBlocking, WorkerGuard) {
155        let (sender, receiver) = bounded(buffered_lines_limit);
156
157        let (shutdown_sender, shutdown_receiver) = bounded(0);
158
159        let worker = Worker::new(receiver, writer, shutdown_receiver);
160        let worker_guard = WorkerGuard::new(
161            worker.worker_thread(thread_name),
162            sender.clone(),
163            shutdown_sender,
164        );
165
166        (
167            Self {
168                channel: sender,
169                error_counter: ErrorCounter(Arc::new(AtomicUsize::new(0))),
170                is_lossy,
171            },
172            worker_guard,
173        )
174    }
175
176    /// Returns a counter for the number of times logs where dropped. This will always return zero if
177    /// `NonBlocking` is not lossy.
178    pub fn error_counter(&self) -> ErrorCounter {
179        self.error_counter.clone()
180    }
181}
182
183/// A builder for [`NonBlocking`].
184#[derive(Debug)]
185pub struct NonBlockingBuilder {
186    buffered_lines_limit: usize,
187    is_lossy: bool,
188    thread_name: String,
189}
190
191impl NonBlockingBuilder {
192    /// Sets the number of lines to buffer before dropping logs or exerting backpressure on senders
193    pub fn buffered_lines_limit(mut self, buffered_lines_limit: usize) -> NonBlockingBuilder {
194        self.buffered_lines_limit = buffered_lines_limit;
195        self
196    }
197
198    /// Sets whether `NonBlocking` should be lossy or not.
199    ///
200    /// If set to `true`, logs will be dropped when the buffered limit is reached. If `false`, backpressure
201    /// will be exerted on senders, blocking them until the buffer has capacity again.
202    ///
203    /// By default, the built `NonBlocking` will be lossy.
204    pub fn lossy(mut self, is_lossy: bool) -> NonBlockingBuilder {
205        self.is_lossy = is_lossy;
206        self
207    }
208
209    /// Override the worker thread's name.
210    ///
211    /// The default worker thread name is "tracing-appender".
212    pub fn thread_name(mut self, name: &str) -> NonBlockingBuilder {
213        self.thread_name = name.to_string();
214        self
215    }
216
217    /// Completes the builder, returning the configured `NonBlocking`.
218    pub fn finish<T: Write + Send + 'static>(self, writer: T) -> (NonBlocking, WorkerGuard) {
219        NonBlocking::create(
220            writer,
221            self.buffered_lines_limit,
222            self.is_lossy,
223            self.thread_name,
224        )
225    }
226}
227
228impl Default for NonBlockingBuilder {
229    fn default() -> Self {
230        NonBlockingBuilder {
231            buffered_lines_limit: DEFAULT_BUFFERED_LINES_LIMIT,
232            is_lossy: true,
233            thread_name: "tracing-appender".to_string(),
234        }
235    }
236}
237
238impl std::io::Write for NonBlocking {
239    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
240        let buf_size = buf.len();
241        if self.is_lossy {
242            if self.channel.try_send(Msg::Line(buf.to_vec())).is_err() {
243                self.error_counter.incr_saturating();
244            }
245        } else {
246            return match self.channel.send(Msg::Line(buf.to_vec())) {
247                Ok(_) => Ok(buf_size),
248                Err(_) => Err(io::Error::from(io::ErrorKind::Other)),
249            };
250        }
251        Ok(buf_size)
252    }
253
254    fn flush(&mut self) -> io::Result<()> {
255        Ok(())
256    }
257
258    #[inline]
259    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
260        self.write(buf).map(|_| ())
261    }
262}
263
264impl<'a> MakeWriter<'a> for NonBlocking {
265    type Writer = NonBlocking;
266
267    fn make_writer(&'a self) -> Self::Writer {
268        self.clone()
269    }
270}
271
272impl WorkerGuard {
273    fn new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self {
274        WorkerGuard {
275            handle: Some(handle),
276            sender,
277            shutdown,
278        }
279    }
280}
281
282impl Drop for WorkerGuard {
283    fn drop(&mut self) {
284        let timeout = Duration::from_millis(100);
285        match self.sender.send_timeout(Msg::Shutdown, timeout) {
286            Ok(_) => {
287                // Attempt to wait for `Worker` to flush all messages before dropping. This happens
288                // when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout`
289                // so that drop is not blocked indefinitely.
290                // TODO: Make timeout configurable.
291                let timeout = Duration::from_millis(1000);
292                match self.shutdown.send_timeout((), timeout) {
293                    Err(SendTimeoutError::Timeout(_)) => {
294                        eprintln!(
295                            "Shutting down logging worker timed out after {:?}.",
296                            timeout
297                        );
298                    }
299                    _ => {
300                        // At this point it is safe to wait for `Worker` destruction without blocking
301                        if let Some(handle) = self.handle.take() {
302                            if handle.join().is_err() {
303                                eprintln!("Logging worker thread panicked");
304                            }
305                        };
306                    }
307                }
308            }
309            Err(SendTimeoutError::Disconnected(_)) => (),
310            Err(SendTimeoutError::Timeout(_)) => eprintln!(
311                "Sending shutdown signal to logging worker timed out after {:?}",
312                timeout
313            ),
314        }
315    }
316}
317
318// === impl ErrorCounter ===
319
320impl ErrorCounter {
321    /// Returns the number of log lines that have been dropped.
322    ///
323    /// If the non-blocking writer is not configured in [lossy mode], the error
324    /// count should always be 0.
325    ///
326    /// [lossy mode]: NonBlockingBuilder::lossy
327    pub fn dropped_lines(&self) -> usize {
328        self.0.load(Ordering::Acquire)
329    }
330
331    fn incr_saturating(&self) {
332        let mut curr = self.0.load(Ordering::Acquire);
333        // We don't need to enter the CAS loop if the current value is already
334        // `usize::MAX`.
335        if curr == usize::MAX {
336            return;
337        }
338
339        // This is implemented as a CAS loop rather than as a simple
340        // `fetch_add`, because we don't want to wrap on overflow. Instead, we
341        // need to ensure that saturating addition is performed.
342        loop {
343            let val = curr.saturating_add(1);
344            match self
345                .0
346                .compare_exchange(curr, val, Ordering::AcqRel, Ordering::Acquire)
347            {
348                Ok(_) => return,
349                Err(actual) => curr = actual,
350            }
351        }
352    }
353}
354
355#[cfg(test)]
356mod test {
357    use super::*;
358    use std::sync::mpsc;
359    use std::thread;
360    use std::time::Duration;
361
362    struct MockWriter {
363        tx: mpsc::SyncSender<String>,
364    }
365
366    impl MockWriter {
367        fn new(capacity: usize) -> (Self, mpsc::Receiver<String>) {
368            let (tx, rx) = mpsc::sync_channel(capacity);
369            (Self { tx }, rx)
370        }
371    }
372
373    impl std::io::Write for MockWriter {
374        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
375            let buf_len = buf.len();
376            let _ = self.tx.send(String::from_utf8_lossy(buf).to_string());
377            Ok(buf_len)
378        }
379
380        fn flush(&mut self) -> std::io::Result<()> {
381            Ok(())
382        }
383    }
384
385    #[test]
386    fn backpressure_exerted() {
387        let (mock_writer, rx) = MockWriter::new(1);
388
389        let (mut non_blocking, _guard) = self::NonBlockingBuilder::default()
390            .lossy(false)
391            .buffered_lines_limit(1)
392            .finish(mock_writer);
393
394        let error_count = non_blocking.error_counter();
395
396        non_blocking.write_all(b"Hello").expect("Failed to write");
397        assert_eq!(0, error_count.dropped_lines());
398
399        let handle = thread::spawn(move || {
400            non_blocking.write_all(b", World").expect("Failed to write");
401        });
402
403        // Sleep a little to ensure previously spawned thread gets blocked on write.
404        thread::sleep(Duration::from_millis(100));
405        // We should not drop logs when blocked.
406        assert_eq!(0, error_count.dropped_lines());
407
408        // Read the first message to unblock sender.
409        let mut line = rx.recv().unwrap();
410        assert_eq!(line, "Hello");
411
412        // Wait for thread to finish.
413        handle.join().expect("thread should not panic");
414
415        // Thread has joined, we should be able to read the message it sent.
416        line = rx.recv().unwrap();
417        assert_eq!(line, ", World");
418    }
419
420    fn write_non_blocking(non_blocking: &mut NonBlocking, msg: &[u8]) {
421        non_blocking.write_all(msg).expect("Failed to write");
422
423        // Sleep a bit to prevent races.
424        thread::sleep(Duration::from_millis(200));
425    }
426
427    #[test]
428    #[ignore] // flaky, see https://github.com/tokio-rs/tracing/issues/751
429    fn logs_dropped_if_lossy() {
430        let (mock_writer, rx) = MockWriter::new(1);
431
432        let (mut non_blocking, _guard) = self::NonBlockingBuilder::default()
433            .lossy(true)
434            .buffered_lines_limit(1)
435            .finish(mock_writer);
436
437        let error_count = non_blocking.error_counter();
438
439        // First write will not block
440        write_non_blocking(&mut non_blocking, b"Hello");
441        assert_eq!(0, error_count.dropped_lines());
442
443        // Second write will not block as Worker will have called `recv` on channel.
444        // "Hello" is not yet consumed. MockWriter call to write_all will block until
445        // "Hello" is consumed.
446        write_non_blocking(&mut non_blocking, b", World");
447        assert_eq!(0, error_count.dropped_lines());
448
449        // Will sit in NonBlocking channel's buffer.
450        write_non_blocking(&mut non_blocking, b"Test");
451        assert_eq!(0, error_count.dropped_lines());
452
453        // Allow a line to be written. "Hello" message will be consumed.
454        // ", World" will be able to write to MockWriter.
455        // "Test" will block on call to MockWriter's `write_all`
456        let line = rx.recv().unwrap();
457        assert_eq!(line, "Hello");
458
459        // This will block as NonBlocking channel is full.
460        write_non_blocking(&mut non_blocking, b"Universe");
461        assert_eq!(1, error_count.dropped_lines());
462
463        // Finally the second message sent will be consumed.
464        let line = rx.recv().unwrap();
465        assert_eq!(line, ", World");
466        assert_eq!(1, error_count.dropped_lines());
467    }
468
469    #[test]
470    fn multi_threaded_writes() {
471        let (mock_writer, rx) = MockWriter::new(DEFAULT_BUFFERED_LINES_LIMIT);
472
473        let (non_blocking, _guard) = self::NonBlockingBuilder::default()
474            .lossy(true)
475            .finish(mock_writer);
476
477        let error_count = non_blocking.error_counter();
478        let mut join_handles: Vec<JoinHandle<()>> = Vec::with_capacity(10);
479
480        for _ in 0..10 {
481            let cloned_non_blocking = non_blocking.clone();
482            join_handles.push(thread::spawn(move || {
483                let collector = tracing_subscriber::fmt().with_writer(cloned_non_blocking);
484                tracing::collect::with_default(collector.finish(), || {
485                    tracing::event!(tracing::Level::INFO, "Hello");
486                });
487            }));
488        }
489
490        for handle in join_handles {
491            handle.join().expect("Failed to join thread");
492        }
493
494        let mut hello_count: u8 = 0;
495
496        while let Ok(event_str) = rx.recv_timeout(Duration::from_secs(5)) {
497            assert!(event_str.contains("Hello"));
498            hello_count += 1;
499        }
500
501        assert_eq!(10, hello_count);
502        assert_eq!(0, error_count.dropped_lines());
503    }
504}