shadow_rs/core/logger/
shadow_logger.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Mutex, RwLock};
5use std::time::Duration;
6
7use crossbeam::queue::ArrayQueue;
8use log::{Level, LevelFilter, Log, Metadata, Record, SetLoggerError};
9use logger as c_log;
10use once_cell::sync::{Lazy, OnceCell};
11use shadow_shim_helper_rs::emulated_time::EmulatedTime;
12use shadow_shim_helper_rs::util::time::TimeParts;
13
14use crate::core::worker::Worker;
15use crate::host::host::HostInfo;
16
17/// Trigger an asynchronous flush when this many lines are queued.
18const ASYNC_FLUSH_QD_LINES_THRESHOLD: usize = 100_000;
19
20/// Performs a *synchronous* flush when this many lines are queued.  i.e. if
21/// after reaching the `ASYNC_FLUSH_QD_LINES_THRESHOLD`, log lines are still
22/// coming in faster than they can actually be flushed, when we reach this limit
23/// we'll pause and let it finish flushing rather than letting the queue
24/// continue growing.
25const SYNC_FLUSH_QD_LINES_THRESHOLD: usize = 10 * ASYNC_FLUSH_QD_LINES_THRESHOLD;
26
27/// Logging thread flushes at least this often.
28const MIN_FLUSH_FREQUENCY: Duration = Duration::from_secs(10);
29
30static SHADOW_LOGGER: Lazy<ShadowLogger> = Lazy::new(ShadowLogger::new);
31
32/// Initialize the Shadow logger.
33pub fn init(
34    max_log_level: LevelFilter,
35    report_errors_to_stderr: bool,
36) -> Result<(), SetLoggerError> {
37    SHADOW_LOGGER.set_max_level(max_log_level);
38    SHADOW_LOGGER.set_report_errors_to_stderr(report_errors_to_stderr);
39
40    log::set_logger(&*SHADOW_LOGGER)?;
41
42    // Shadow's logger has its own logic for deciding the max level (see `ShadowLogger::enabled`),
43    // so the log crate should give us all log messages and we can decide whether to show it or not.
44    log::set_max_level(log::LevelFilter::Trace);
45
46    // Start the thread that will receive log records and flush them to output.
47    std::thread::Builder::new()
48        .name("shadow-logger".to_string())
49        .spawn(move || SHADOW_LOGGER.logger_thread_fn())
50        .unwrap();
51
52    // Arrange to flush the logger on panic.
53    let default_panic_handler = std::panic::take_hook();
54    std::panic::set_hook(Box::new(move |panic_info| {
55        // Attempt to flush the logger. We want to avoid a recursive panic, so
56        // we flush the queue on the current thread instead of trying to send
57        // a command to the logger thread (because our thread-local sender
58        // may have already been destructed, and because the logger thread
59        // itself may be in a bad state), and ignore errors.
60        SHADOW_LOGGER.flush_records(None).ok();
61        default_panic_handler(panic_info);
62    }));
63
64    Ok(())
65}
66
67/// A logger specialized for Shadow.
68///
69/// It attaches simulation context to log entries (e.g. sim time, running
70/// process, etc.). It's also designed for high performance to accomodate heavy
71/// logging from multiple threads.
72pub struct ShadowLogger {
73    // Channel used to send commands to the logger's thread.
74    //
75    // The Sender half of a channel isn't Sync, so we must protect it with a
76    // Mutex to make ShadowLogger be Sync. This is only accessed once per
77    // thread, though, to clone into the thread-local SENDER.
78    command_sender: Mutex<Sender<LoggerCommand>>,
79
80    // Like the sender, needs a Mutex for ShadowLogger to be Sync.
81    // The Mutex is only locked once though by the logger thread, which keeps
82    // it locked for as long as it's running.
83    command_receiver: Mutex<Receiver<LoggerCommand>>,
84
85    // A lock-free queue for individual log records. We don't put the records
86    // themselves in the `command_sender`, because `Sender` doesn't support
87    // getting the queue length. Conversely we don't put commands in this queue
88    // because it doesn't support blocking operations.
89    //
90    // The size is roughly SYNC_FLUSH_QD_LINES_THRESHOLD *
91    // size_of<ShadowLogRecord>; we might want to consider SegQueue (which grows
92    // and shrinks dynamically) instead if we ever make SYNC_FLUSH_QD_LINES_THRESHOLD very
93    // large.
94    records: ArrayQueue<ShadowLogRecord>,
95
96    // When false, sends a (still-asynchronous) flush command to the logger
97    // thread every time a record is pushed into `records`.
98    buffering_enabled: RwLock<bool>,
99
100    // The maximum log level, unless overridden by a host-specific log level.
101    max_log_level: OnceCell<LevelFilter>,
102
103    // Whether to report errors to stderr in addition to logging to stdout.
104    report_errors_to_stderr: OnceCell<bool>,
105}
106
107thread_local!(static SENDER: RefCell<Option<Sender<LoggerCommand>>> = const{ RefCell::new(None)});
108thread_local!(static THREAD_NAME: String = get_thread_name());
109thread_local!(static THREAD_ID: nix::unistd::Pid = nix::unistd::gettid());
110
111fn get_thread_name() -> String {
112    let mut thread_name = Vec::<i8>::with_capacity(16);
113    let res = unsafe {
114        thread_name.set_len(thread_name.capacity());
115        // ~infallible when host_name is at least 16 bytes.
116        libc::pthread_getname_np(
117            libc::pthread_self(),
118            thread_name.as_mut_ptr(),
119            thread_name.len(),
120        )
121    };
122    // The most likely cause of failure is a bug in the caller.
123    debug_assert_eq!(
124        res,
125        0,
126        "pthread_getname_np: {}",
127        nix::errno::Errno::from_raw(res),
128    );
129    if res == 0 {
130        // SAFETY: We just initialized the input buffer `thread_name`, and
131        // `thread_name_cstr` won't outlive it.
132        let thread_name_cstr = unsafe { std::ffi::CStr::from_ptr(thread_name.as_ptr()) };
133        return thread_name_cstr.to_owned().to_string_lossy().to_string();
134    }
135
136    // Another potential reason for failure is if it couldn't open
137    // /proc/self/task/[tid]/comm. We're probably in a bad state anyway if that
138    // happens, but try to recover anyway.
139
140    // Empty string
141    String::new()
142}
143
144impl ShadowLogger {
145    fn new() -> ShadowLogger {
146        let (sender, receiver) = std::sync::mpsc::channel();
147
148        ShadowLogger {
149            records: ArrayQueue::new(SYNC_FLUSH_QD_LINES_THRESHOLD),
150            command_sender: Mutex::new(sender),
151            command_receiver: Mutex::new(receiver),
152            buffering_enabled: RwLock::new(false),
153            max_log_level: OnceCell::new(),
154            report_errors_to_stderr: OnceCell::new(),
155        }
156    }
157
158    // Function executed by the logger's helper thread, onto which we offload as
159    // much work as we can.
160    fn logger_thread_fn(&self) {
161        let command_receiver = self.command_receiver.lock().unwrap();
162
163        loop {
164            use std::sync::mpsc::RecvTimeoutError;
165            match command_receiver.recv_timeout(MIN_FLUSH_FREQUENCY) {
166                Ok(LoggerCommand::Flush(done_sender)) => self.flush_records(done_sender).unwrap(),
167                Err(RecvTimeoutError::Timeout) => {
168                    // Flush
169                    self.flush_records(None).unwrap();
170                }
171                Err(e) => panic!("Unexpected error {e}"),
172            }
173        }
174    }
175
176    // Function called by the logger's helper thread to flush the contents of
177    // self.records. If `done_sender` is provided, it's notified after the flush
178    // has completed.
179    fn flush_records(&self, done_sender: Option<Sender<()>>) -> std::io::Result<()> {
180        use std::io::Write;
181
182        // Only flush records that are already in the queue, not ones that
183        // arrive while we're flushing. Otherwise callers who perform a
184        // synchronous flush (whether this flush operation or another one that
185        // arrives while we're flushing) will be left waiting longer than
186        // necessary. Also keeps us from holding the stdout lock indefinitely.
187        let mut toflush = self.records.len();
188
189        let stdout_unlocked = std::io::stdout();
190        let stdout_locked = stdout_unlocked.lock();
191        let mut stdout = std::io::BufWriter::new(stdout_locked);
192
193        while toflush > 0 {
194            let record = match self.records.pop() {
195                Some(r) => r,
196                None => {
197                    // This can happen if another thread panics while the
198                    // logging thread is flushing. In that case both threads
199                    // will be consuming from the queue.
200                    break;
201                }
202            };
203            toflush -= 1;
204
205            write!(stdout, "{record}")?;
206
207            if record.level <= Level::Error && *self.report_errors_to_stderr.get().unwrap() {
208                // *also* summarize on stderr.
209
210                // First flush stdout to avoid confusing interleaving if stdout and stderr are merged.
211                stdout.flush()?;
212
213                // Summarize on stderr. We use a `BufWriter` to try to help
214                // ensure we ultimately make a single `write` syscall, though
215                // the flushes above and below *should* already prevent any
216                // interleaving with stdout.
217                let stderr_unlocked = std::io::stderr();
218                let stderr_locked = stderr_unlocked.lock();
219                let mut stderr = std::io::BufWriter::new(stderr_locked);
220                writeln!(stderr, "Error: {}", record.message)?;
221
222                // Explicitly flush before dropping to detect errors.
223                stderr.flush()?;
224                drop(stderr);
225            }
226        }
227
228        // Explicitly flush before dropping to detect errors.
229        stdout.flush()?;
230        drop(stdout);
231
232        if let Some(done_sender) = done_sender {
233            // We can't log from this thread without risking deadlock, so in the
234            // unlikely case that the calling thread has gone away, just print
235            // directly.
236            done_sender.send(()).unwrap_or_else(|e| {
237                println!(
238                    "WARNING: Logger couldn't notify
239                calling thread: {e:?}",
240                )
241            });
242        }
243        Ok(())
244    }
245
246    /// When disabled, the logger thread is notified to write each record as
247    /// soon as it's created.  The calling thread still isn't blocked on the
248    /// record actually being written, though.
249    pub fn set_buffering_enabled(&self, buffering_enabled: bool) {
250        let mut writer = self.buffering_enabled.write().unwrap();
251        *writer = buffering_enabled;
252    }
253
254    /// If the maximum log level has not yet been set, returns `LevelFilter::Trace`.
255    pub fn max_level(&self) -> LevelFilter {
256        self.max_log_level
257            .get()
258            .copied()
259            .unwrap_or(LevelFilter::Trace)
260    }
261
262    /// Set the default maximum log level, but this can be overridden per-host. Is only intended to
263    /// be called from `init()`. Will panic if called more than once.
264    fn set_max_level(&self, level: LevelFilter) {
265        self.max_log_level.set(level).unwrap()
266    }
267
268    /// Set whether to report errors to stderr in addition to logging on stdout.
269    ///
270    /// Is only intended to be called from `init()`. Will panic if called more
271    /// than once.
272    fn set_report_errors_to_stderr(&self, val: bool) {
273        self.report_errors_to_stderr.set(val).unwrap()
274    }
275
276    // Send a flush command to the logger thread.
277    fn flush_impl(&self, notify_done: Option<Sender<()>>) {
278        self.send_command(LoggerCommand::Flush(notify_done))
279    }
280
281    // Send a flush command to the logger thread and block until it's completed.
282    fn flush_sync(&self) {
283        let (done_sender, done_receiver) = std::sync::mpsc::channel();
284        self.flush_impl(Some(done_sender));
285        done_receiver.recv().unwrap();
286    }
287
288    // Send a flush command to the logger thread.
289    fn flush_async(&self) {
290        self.flush_impl(None);
291    }
292
293    // Send a command to the logger thread.
294    fn send_command(&self, cmd: LoggerCommand) {
295        SENDER
296            .try_with(|thread_sender| {
297                if thread_sender.borrow().is_none() {
298                    let lock = self.command_sender.lock().unwrap();
299                    *thread_sender.borrow_mut() = Some(lock.clone());
300                }
301                thread_sender
302                    .borrow()
303                    .as_ref()
304                    .unwrap()
305                    .send(cmd)
306                    .unwrap_or_else(|e| {
307                        println!("WARNING: Couldn't send command to logger thread: {e:?}");
308                    });
309            })
310            .unwrap_or_else(|e| {
311                println!("WARNING: Couldn't get sender channel to logger thread: {e:?}");
312            });
313    }
314}
315
316impl Log for ShadowLogger {
317    fn enabled(&self, metadata: &Metadata) -> bool {
318        let filter = match Worker::with_active_host(|host| host.info().log_level) {
319            Some(Some(level)) => level,
320            _ => self.max_level(),
321        };
322        metadata.level() <= filter
323    }
324
325    fn log(&self, record: &Record) {
326        if !self.enabled(record.metadata()) {
327            return;
328        }
329
330        let message = std::fmt::format(*record.args());
331
332        let host_info = Worker::with_active_host(|host| host.info().clone());
333
334        let mut shadowrecord = ShadowLogRecord {
335            level: record.level(),
336            file: record.file_static(),
337            module_path: record.module_path_static(),
338            line: record.line(),
339            message,
340            wall_time: Duration::from_micros(unsafe {
341                u64::try_from(c_log::logger_elapsed_micros()).unwrap()
342            }),
343
344            emu_time: Worker::current_time(),
345            thread_name: THREAD_NAME
346                .try_with(|name| (*name).clone())
347                .unwrap_or_else(|_| get_thread_name()),
348            thread_id: THREAD_ID
349                .try_with(|id| *id)
350                .unwrap_or_else(|_| nix::unistd::gettid()),
351            host_info,
352        };
353
354        loop {
355            match self.records.push(shadowrecord) {
356                Ok(()) => break,
357                Err(r) => {
358                    // Queue is full. Flush it and try again.
359                    shadowrecord = r;
360                    self.flush_sync();
361                }
362            }
363        }
364
365        if record.level() == Level::Error {
366            // Unlike in Shadow's C code, we don't abort the program on Error
367            // logs. In Rust the same purpose is filled with `panic` and
368            // `unwrap`. C callers will still exit or abort via the lib/logger wrapper.
369            //
370            // Flush *synchronously*, since we're likely about to crash one way or another.
371            self.flush_sync();
372        } else if self.records.len() > ASYNC_FLUSH_QD_LINES_THRESHOLD
373            || !*self.buffering_enabled.read().unwrap()
374        {
375            self.flush_async();
376        }
377    }
378
379    fn flush(&self) {
380        self.flush_sync();
381    }
382}
383
384struct ShadowLogRecord {
385    level: Level,
386    file: Option<&'static str>,
387    module_path: Option<&'static str>,
388    line: Option<u32>,
389    message: String,
390    wall_time: Duration,
391
392    emu_time: Option<EmulatedTime>,
393    thread_name: String,
394    thread_id: nix::unistd::Pid,
395    host_info: Option<Arc<HostInfo>>,
396}
397
398impl std::fmt::Display for ShadowLogRecord {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        {
401            let parts = TimeParts::from_nanos(self.wall_time.as_nanos());
402            write!(
403                f,
404                "{:02}:{:02}:{:02}.{:06}",
405                parts.hours,
406                parts.mins,
407                parts.secs,
408                parts.nanos / 1000
409            )?;
410        }
411        write!(f, " [{}:{}]", self.thread_id, self.thread_name)?;
412        if let Some(emu_time) = self.emu_time {
413            let sim_time = emu_time.duration_since(&EmulatedTime::SIMULATION_START);
414            let parts = TimeParts::from_nanos(sim_time.as_nanos());
415            write!(
416                f,
417                " {:02}:{:02}:{:02}.{:09}",
418                parts.hours, parts.mins, parts.secs, parts.nanos
419            )?;
420        } else {
421            write!(f, " n/a")?;
422        }
423        write!(f, " [{level}]", level = self.level)?;
424        if let Some(host) = &self.host_info {
425            write!(
426                f,
427                " [{hostname}:{ip}]",
428                hostname = host.name,
429                ip = host.default_ip,
430            )?;
431        } else {
432            write!(f, " [n/a]",)?;
433        }
434        write!(
435            f,
436            " [{file}:",
437            file = self
438                .file
439                .map(|f| if let Some(sep_pos) = f.rfind('/') {
440                    &f[(sep_pos + 1)..]
441                } else {
442                    f
443                })
444                .unwrap_or("n/a"),
445        )?;
446        if let Some(line) = self.line {
447            write!(f, "{line}")?;
448        } else {
449            write!(f, "n/a")?;
450        }
451        writeln!(
452            f,
453            "] [{module}] {msg}",
454            module = self.module_path.unwrap_or("n/a"),
455            msg = self.message
456        )?;
457        Ok(())
458    }
459}
460
461enum LoggerCommand {
462    // Flush; takes an optional one-shot channel to notify that the flush has completed.
463    Flush(Option<Sender<()>>),
464}
465
466pub fn set_buffering_enabled(buffering_enabled: bool) {
467    SHADOW_LOGGER.set_buffering_enabled(buffering_enabled);
468}
469
470mod export {
471    use super::*;
472
473    /// When disabled, the logger thread is notified to write each record as
474    /// soon as it's created.  The calling thread still isn't blocked on the
475    /// record actually being written, though.
476    #[unsafe(no_mangle)]
477    pub unsafe extern "C-unwind" fn shadow_logger_setEnableBuffering(buffering_enabled: i32) {
478        set_buffering_enabled(buffering_enabled != 0)
479    }
480}