shadow_rs/core/logger/
shadow_logger.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Mutex, RwLock};
5use std::time::Duration;
6
7use crossbeam::queue::ArrayQueue;
8use log::{Level, LevelFilter, Log, Metadata, Record, SetLoggerError};
9use logger as c_log;
10use once_cell::sync::{Lazy, OnceCell};
11use shadow_shim_helper_rs::emulated_time::EmulatedTime;
12use shadow_shim_helper_rs::util::time::TimeParts;
13
14use crate::core::worker::Worker;
15use crate::host::host::HostInfo;
16
17/// Trigger an asynchronous flush when this many lines are queued.
18const ASYNC_FLUSH_QD_LINES_THRESHOLD: usize = 100_000;
19
20/// Performs a *synchronous* flush when this many lines are queued.  i.e. if
21/// after reaching the `ASYNC_FLUSH_QD_LINES_THRESHOLD`, log lines are still
22/// coming in faster than they can actually be flushed, when we reach this limit
23/// we'll pause and let it finish flushing rather than letting the queue
24/// continue growing.
25const SYNC_FLUSH_QD_LINES_THRESHOLD: usize = 10 * ASYNC_FLUSH_QD_LINES_THRESHOLD;
26
27/// Logging thread flushes at least this often.
28const MIN_FLUSH_FREQUENCY: Duration = Duration::from_secs(10);
29
30static SHADOW_LOGGER: Lazy<ShadowLogger> = Lazy::new(ShadowLogger::new);
31
32/// Initialize the Shadow logger.
33pub fn init(
34    max_log_level: LevelFilter,
35    report_errors_to_stderr: bool,
36) -> Result<(), SetLoggerError> {
37    SHADOW_LOGGER.set_max_level(max_log_level);
38    SHADOW_LOGGER.set_report_errors_to_stderr(report_errors_to_stderr);
39
40    log::set_logger(&*SHADOW_LOGGER)?;
41
42    // Shadow's logger has its own logic for deciding the max level (see `ShadowLogger::enabled`),
43    // so the log crate should give us all log messages and we can decide whether to show it or not.
44    log::set_max_level(log::LevelFilter::Trace);
45
46    // Start the thread that will receive log records and flush them to output.
47    std::thread::Builder::new()
48        .name("shadow-logger".to_string())
49        .spawn(move || SHADOW_LOGGER.logger_thread_fn())
50        .unwrap();
51
52    // Arrange to flush the logger on panic.
53    let default_panic_handler = std::panic::take_hook();
54    std::panic::set_hook(Box::new(move |panic_info| {
55        // Attempt to flush the logger. We want to avoid a recursive panic, so
56        // we flush the queue on the current thread instead of trying to send
57        // a command to the logger thread (because our thread-local sender
58        // may have already been destructed, and because the logger thread
59        // itself may be in a bad state), and ignore errors.
60        SHADOW_LOGGER.flush_records(None).ok();
61        default_panic_handler(panic_info);
62    }));
63
64    Ok(())
65}
66
67/// A logger specialized for Shadow.
68///
69/// It attaches simulation context to log entries (e.g. sim time, running
70/// process, etc.). It's also designed for high performance to accomodate heavy
71/// logging from multiple threads.
72pub struct ShadowLogger {
73    // Channel used to send commands to the logger's thread.
74    //
75    // The Sender half of a channel isn't Sync, so we must protect it with a
76    // Mutex to make ShadowLogger be Sync. This is only accessed once per
77    // thread, though, to clone into the thread-local SENDER.
78    command_sender: Mutex<Sender<LoggerCommand>>,
79
80    // Like the sender, needs a Mutex for ShadowLogger to be Sync.
81    // The Mutex is only locked once though by the logger thread, which keeps
82    // it locked for as long as it's running.
83    command_receiver: Mutex<Receiver<LoggerCommand>>,
84
85    // A lock-free queue for individual log records. We don't put the records
86    // themselves in the `command_sender`, because `Sender` doesn't support
87    // getting the queue length. Conversely we don't put commands in this queue
88    // because it doesn't support blocking operations.
89    //
90    // The size is roughly SYNC_FLUSH_QD_LINES_THRESHOLD *
91    // size_of<ShadowLogRecord>; we might want to consider SegQueue (which grows
92    // and shrinks dynamically) instead if we ever make SYNC_FLUSH_QD_LINES_THRESHOLD very
93    // large.
94    records: ArrayQueue<ShadowLogRecord>,
95
96    // When false, sends a (still-asynchronous) flush command to the logger
97    // thread every time a record is pushed into `records`.
98    buffering_enabled: RwLock<bool>,
99
100    // The maximum log level, unless overridden by a host-specific log level.
101    max_log_level: OnceCell<LevelFilter>,
102
103    // Whether to report errors to stderr in addition to logging to stdout.
104    report_errors_to_stderr: OnceCell<bool>,
105}
106
107thread_local!(static SENDER: RefCell<Option<Sender<LoggerCommand>>> = const{ RefCell::new(None)});
108thread_local!(static THREAD_NAME: String = get_thread_name());
109thread_local!(static THREAD_ID: nix::unistd::Pid = nix::unistd::gettid());
110
111fn get_thread_name() -> String {
112    let mut thread_name = Vec::<i8>::with_capacity(16);
113    let res = unsafe {
114        thread_name.set_len(thread_name.capacity());
115        // ~infallible when host_name is at least 16 bytes.
116        libc::pthread_getname_np(
117            libc::pthread_self(),
118            thread_name.as_mut_ptr(),
119            thread_name.len(),
120        )
121    };
122    // The most likely cause of failure is a bug in the caller.
123    debug_assert_eq!(
124        res,
125        0,
126        "pthread_getname_np: {}",
127        nix::errno::Errno::from_raw(res),
128    );
129    if res == 0 {
130        // SAFETY: We just initialized the input buffer `thread_name`, and
131        // `thread_name_cstr` won't outlive it.
132        let thread_name_cstr = unsafe { std::ffi::CStr::from_ptr(thread_name.as_ptr()) };
133        return thread_name_cstr.to_owned().to_string_lossy().to_string();
134    }
135
136    // Another potential reason for failure is if it couldn't open
137    // /proc/self/task/[tid]/comm. We're probably in a bad state anyway if that
138    // happens, but try to recover anyway.
139
140    // Empty string
141    String::new()
142}
143
144impl ShadowLogger {
145    fn new() -> ShadowLogger {
146        let (sender, receiver) = std::sync::mpsc::channel();
147
148        ShadowLogger {
149            records: ArrayQueue::new(SYNC_FLUSH_QD_LINES_THRESHOLD),
150            command_sender: Mutex::new(sender),
151            command_receiver: Mutex::new(receiver),
152            buffering_enabled: RwLock::new(false),
153            max_log_level: OnceCell::new(),
154            report_errors_to_stderr: OnceCell::new(),
155        }
156    }
157
158    // Function executed by the logger's helper thread, onto which we offload as
159    // much work as we can.
160    fn logger_thread_fn(&self) {
161        let command_receiver = self.command_receiver.lock().unwrap();
162
163        loop {
164            use std::sync::mpsc::RecvTimeoutError;
165            match command_receiver.recv_timeout(MIN_FLUSH_FREQUENCY) {
166                Ok(LoggerCommand::Flush(done_sender)) => self.flush_records(done_sender).unwrap(),
167                Err(RecvTimeoutError::Timeout) => {
168                    // Flush
169                    self.flush_records(None).unwrap();
170                }
171                Err(e) => panic!("Unexpected error {}", e),
172            }
173        }
174    }
175
176    // Function called by the logger's helper thread to flush the contents of
177    // self.records. If `done_sender` is provided, it's notified after the flush
178    // has completed.
179    fn flush_records(&self, done_sender: Option<Sender<()>>) -> std::io::Result<()> {
180        use std::io::Write;
181
182        // Only flush records that are already in the queue, not ones that
183        // arrive while we're flushing. Otherwise callers who perform a
184        // synchronous flush (whether this flush operation or another one that
185        // arrives while we're flushing) will be left waiting longer than
186        // necessary. Also keeps us from holding the stdout lock indefinitely.
187        let mut toflush = self.records.len();
188
189        let stdout_unlocked = std::io::stdout();
190        let stdout_locked = stdout_unlocked.lock();
191        let mut stdout = std::io::BufWriter::new(stdout_locked);
192
193        while toflush > 0 {
194            let record = match self.records.pop() {
195                Some(r) => r,
196                None => {
197                    // This can happen if another thread panics while the
198                    // logging thread is flushing. In that case both threads
199                    // will be consuming from the queue.
200                    break;
201                }
202            };
203            toflush -= 1;
204
205            write!(stdout, "{record}")?;
206
207            if record.level <= Level::Error && *self.report_errors_to_stderr.get().unwrap() {
208                // *also* summarize on stderr.
209
210                // First flush stdout to avoid confusing interleaving if stdout and stderr are merged.
211                stdout.flush()?;
212
213                // Summarize on stderr. We use a `BufWriter` to try to help
214                // ensure we ultimately make a single `write` syscall, though
215                // the flushes above and below *should* already prevent any
216                // interleaving with stdout.
217                let stderr_unlocked = std::io::stderr();
218                let stderr_locked = stderr_unlocked.lock();
219                let mut stderr = std::io::BufWriter::new(stderr_locked);
220                writeln!(stderr, "Error: {}", record.message)?;
221
222                // Explicitly flush before dropping to detect errors.
223                stderr.flush()?;
224                drop(stderr);
225            }
226        }
227
228        // Explicitly flush before dropping to detect errors.
229        stdout.flush()?;
230        drop(stdout);
231
232        if let Some(done_sender) = done_sender {
233            // We can't log from this thread without risking deadlock, so in the
234            // unlikely case that the calling thread has gone away, just print
235            // directly.
236            done_sender.send(()).unwrap_or_else(|e| {
237                println!(
238                    "WARNING: Logger couldn't notify
239                calling thread: {:?}",
240                    e
241                )
242            });
243        }
244        Ok(())
245    }
246
247    /// When disabled, the logger thread is notified to write each record as
248    /// soon as it's created.  The calling thread still isn't blocked on the
249    /// record actually being written, though.
250    pub fn set_buffering_enabled(&self, buffering_enabled: bool) {
251        let mut writer = self.buffering_enabled.write().unwrap();
252        *writer = buffering_enabled;
253    }
254
255    /// If the maximum log level has not yet been set, returns `LevelFilter::Trace`.
256    pub fn max_level(&self) -> LevelFilter {
257        self.max_log_level
258            .get()
259            .copied()
260            .unwrap_or(LevelFilter::Trace)
261    }
262
263    /// Set the default maximum log level, but this can be overridden per-host. Is only intended to
264    /// be called from `init()`. Will panic if called more than once.
265    fn set_max_level(&self, level: LevelFilter) {
266        self.max_log_level.set(level).unwrap()
267    }
268
269    /// Set whether to report errors to stderr in addition to logging on stdout.
270    ///
271    /// Is only intended to be called from `init()`. Will panic if called more
272    /// than once.
273    fn set_report_errors_to_stderr(&self, val: bool) {
274        self.report_errors_to_stderr.set(val).unwrap()
275    }
276
277    // Send a flush command to the logger thread.
278    fn flush_impl(&self, notify_done: Option<Sender<()>>) {
279        self.send_command(LoggerCommand::Flush(notify_done))
280    }
281
282    // Send a flush command to the logger thread and block until it's completed.
283    fn flush_sync(&self) {
284        let (done_sender, done_receiver) = std::sync::mpsc::channel();
285        self.flush_impl(Some(done_sender));
286        done_receiver.recv().unwrap();
287    }
288
289    // Send a flush command to the logger thread.
290    fn flush_async(&self) {
291        self.flush_impl(None);
292    }
293
294    // Send a command to the logger thread.
295    fn send_command(&self, cmd: LoggerCommand) {
296        SENDER
297            .try_with(|thread_sender| {
298                if thread_sender.borrow().is_none() {
299                    let lock = self.command_sender.lock().unwrap();
300                    *thread_sender.borrow_mut() = Some(lock.clone());
301                }
302                thread_sender
303                    .borrow()
304                    .as_ref()
305                    .unwrap()
306                    .send(cmd)
307                    .unwrap_or_else(|e| {
308                        println!("WARNING: Couldn't send command to logger thread: {:?}", e);
309                    });
310            })
311            .unwrap_or_else(|e| {
312                println!(
313                    "WARNING: Couldn't get sender channel to logger thread: {:?}",
314                    e
315                );
316            });
317    }
318}
319
320impl Log for ShadowLogger {
321    fn enabled(&self, metadata: &Metadata) -> bool {
322        let filter = match Worker::with_active_host(|host| host.info().log_level) {
323            Some(Some(level)) => level,
324            _ => self.max_level(),
325        };
326        metadata.level() <= filter
327    }
328
329    fn log(&self, record: &Record) {
330        if !self.enabled(record.metadata()) {
331            return;
332        }
333
334        let message = std::fmt::format(*record.args());
335
336        let host_info = Worker::with_active_host(|host| host.info().clone());
337
338        let mut shadowrecord = ShadowLogRecord {
339            level: record.level(),
340            file: record.file_static(),
341            module_path: record.module_path_static(),
342            line: record.line(),
343            message,
344            wall_time: Duration::from_micros(unsafe {
345                u64::try_from(c_log::logger_elapsed_micros()).unwrap()
346            }),
347
348            emu_time: Worker::current_time(),
349            thread_name: THREAD_NAME
350                .try_with(|name| (*name).clone())
351                .unwrap_or_else(|_| get_thread_name()),
352            thread_id: THREAD_ID
353                .try_with(|id| *id)
354                .unwrap_or_else(|_| nix::unistd::gettid()),
355            host_info,
356        };
357
358        loop {
359            match self.records.push(shadowrecord) {
360                Ok(()) => break,
361                Err(r) => {
362                    // Queue is full. Flush it and try again.
363                    shadowrecord = r;
364                    self.flush_sync();
365                }
366            }
367        }
368
369        if record.level() == Level::Error {
370            // Unlike in Shadow's C code, we don't abort the program on Error
371            // logs. In Rust the same purpose is filled with `panic` and
372            // `unwrap`. C callers will still exit or abort via the lib/logger wrapper.
373            //
374            // Flush *synchronously*, since we're likely about to crash one way or another.
375            self.flush_sync();
376        } else if self.records.len() > ASYNC_FLUSH_QD_LINES_THRESHOLD
377            || !*self.buffering_enabled.read().unwrap()
378        {
379            self.flush_async();
380        }
381    }
382
383    fn flush(&self) {
384        self.flush_sync();
385    }
386}
387
388struct ShadowLogRecord {
389    level: Level,
390    file: Option<&'static str>,
391    module_path: Option<&'static str>,
392    line: Option<u32>,
393    message: String,
394    wall_time: Duration,
395
396    emu_time: Option<EmulatedTime>,
397    thread_name: String,
398    thread_id: nix::unistd::Pid,
399    host_info: Option<Arc<HostInfo>>,
400}
401
402impl std::fmt::Display for ShadowLogRecord {
403    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
404        {
405            let parts = TimeParts::from_nanos(self.wall_time.as_nanos());
406            write!(
407                f,
408                "{:02}:{:02}:{:02}.{:06}",
409                parts.hours,
410                parts.mins,
411                parts.secs,
412                parts.nanos / 1000
413            )?;
414        }
415        write!(f, " [{}:{}]", self.thread_id, self.thread_name)?;
416        if let Some(emu_time) = self.emu_time {
417            let sim_time = emu_time.duration_since(&EmulatedTime::SIMULATION_START);
418            let parts = TimeParts::from_nanos(sim_time.as_nanos());
419            write!(
420                f,
421                " {:02}:{:02}:{:02}.{:09}",
422                parts.hours, parts.mins, parts.secs, parts.nanos
423            )?;
424        } else {
425            write!(f, " n/a")?;
426        }
427        write!(f, " [{level}]", level = self.level)?;
428        if let Some(host) = &self.host_info {
429            write!(
430                f,
431                " [{hostname}:{ip}]",
432                hostname = host.name,
433                ip = host.default_ip,
434            )?;
435        } else {
436            write!(f, " [n/a]",)?;
437        }
438        write!(
439            f,
440            " [{file}:",
441            file = self
442                .file
443                .map(|f| if let Some(sep_pos) = f.rfind('/') {
444                    &f[(sep_pos + 1)..]
445                } else {
446                    f
447                })
448                .unwrap_or("n/a"),
449        )?;
450        if let Some(line) = self.line {
451            write!(f, "{line}", line = line)?;
452        } else {
453            write!(f, "n/a")?;
454        }
455        writeln!(
456            f,
457            "] [{module}] {msg}",
458            module = self.module_path.unwrap_or("n/a"),
459            msg = self.message
460        )?;
461        Ok(())
462    }
463}
464
465enum LoggerCommand {
466    // Flush; takes an optional one-shot channel to notify that the flush has completed.
467    Flush(Option<Sender<()>>),
468}
469
470pub fn set_buffering_enabled(buffering_enabled: bool) {
471    SHADOW_LOGGER.set_buffering_enabled(buffering_enabled);
472}
473
474mod export {
475    use super::*;
476
477    /// When disabled, the logger thread is notified to write each record as
478    /// soon as it's created.  The calling thread still isn't blocked on the
479    /// record actually being written, though.
480    #[unsafe(no_mangle)]
481    pub unsafe extern "C-unwind" fn shadow_logger_setEnableBuffering(buffering_enabled: i32) {
482        set_buffering_enabled(buffering_enabled != 0)
483    }
484}