1use std::cell::RefCell;
2use std::sync::Arc;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Mutex, RwLock};
5use std::time::Duration;
6
7use crossbeam::queue::ArrayQueue;
8use log::{Level, LevelFilter, Log, Metadata, Record, SetLoggerError};
9use logger as c_log;
10use once_cell::sync::{Lazy, OnceCell};
11use shadow_shim_helper_rs::emulated_time::EmulatedTime;
12use shadow_shim_helper_rs::util::time::TimeParts;
13
14use crate::core::worker::Worker;
15use crate::host::host::HostInfo;
16
17const ASYNC_FLUSH_QD_LINES_THRESHOLD: usize = 100_000;
19
20const SYNC_FLUSH_QD_LINES_THRESHOLD: usize = 10 * ASYNC_FLUSH_QD_LINES_THRESHOLD;
26
27const MIN_FLUSH_FREQUENCY: Duration = Duration::from_secs(10);
29
30static SHADOW_LOGGER: Lazy<ShadowLogger> = Lazy::new(ShadowLogger::new);
31
32pub fn init(
34 max_log_level: LevelFilter,
35 report_errors_to_stderr: bool,
36) -> Result<(), SetLoggerError> {
37 SHADOW_LOGGER.set_max_level(max_log_level);
38 SHADOW_LOGGER.set_report_errors_to_stderr(report_errors_to_stderr);
39
40 log::set_logger(&*SHADOW_LOGGER)?;
41
42 log::set_max_level(log::LevelFilter::Trace);
45
46 std::thread::Builder::new()
48 .name("shadow-logger".to_string())
49 .spawn(move || SHADOW_LOGGER.logger_thread_fn())
50 .unwrap();
51
52 let default_panic_handler = std::panic::take_hook();
54 std::panic::set_hook(Box::new(move |panic_info| {
55 SHADOW_LOGGER.flush_records(None).ok();
61 default_panic_handler(panic_info);
62 }));
63
64 Ok(())
65}
66
67pub struct ShadowLogger {
73 command_sender: Mutex<Sender<LoggerCommand>>,
79
80 command_receiver: Mutex<Receiver<LoggerCommand>>,
84
85 records: ArrayQueue<ShadowLogRecord>,
95
96 buffering_enabled: RwLock<bool>,
99
100 max_log_level: OnceCell<LevelFilter>,
102
103 report_errors_to_stderr: OnceCell<bool>,
105}
106
107thread_local!(static SENDER: RefCell<Option<Sender<LoggerCommand>>> = const{ RefCell::new(None)});
108thread_local!(static THREAD_NAME: String = get_thread_name());
109thread_local!(static THREAD_ID: nix::unistd::Pid = nix::unistd::gettid());
110
111fn get_thread_name() -> String {
112 let mut thread_name = Vec::<i8>::with_capacity(16);
113 let res = unsafe {
114 thread_name.set_len(thread_name.capacity());
115 libc::pthread_getname_np(
117 libc::pthread_self(),
118 thread_name.as_mut_ptr(),
119 thread_name.len(),
120 )
121 };
122 debug_assert_eq!(
124 res,
125 0,
126 "pthread_getname_np: {}",
127 nix::errno::Errno::from_raw(res),
128 );
129 if res == 0 {
130 let thread_name_cstr = unsafe { std::ffi::CStr::from_ptr(thread_name.as_ptr()) };
133 return thread_name_cstr.to_owned().to_string_lossy().to_string();
134 }
135
136 String::new()
142}
143
144impl ShadowLogger {
145 fn new() -> ShadowLogger {
146 let (sender, receiver) = std::sync::mpsc::channel();
147
148 ShadowLogger {
149 records: ArrayQueue::new(SYNC_FLUSH_QD_LINES_THRESHOLD),
150 command_sender: Mutex::new(sender),
151 command_receiver: Mutex::new(receiver),
152 buffering_enabled: RwLock::new(false),
153 max_log_level: OnceCell::new(),
154 report_errors_to_stderr: OnceCell::new(),
155 }
156 }
157
158 fn logger_thread_fn(&self) {
161 let command_receiver = self.command_receiver.lock().unwrap();
162
163 loop {
164 use std::sync::mpsc::RecvTimeoutError;
165 match command_receiver.recv_timeout(MIN_FLUSH_FREQUENCY) {
166 Ok(LoggerCommand::Flush(done_sender)) => self.flush_records(done_sender).unwrap(),
167 Err(RecvTimeoutError::Timeout) => {
168 self.flush_records(None).unwrap();
170 }
171 Err(e) => panic!("Unexpected error {}", e),
172 }
173 }
174 }
175
176 fn flush_records(&self, done_sender: Option<Sender<()>>) -> std::io::Result<()> {
180 use std::io::Write;
181
182 let mut toflush = self.records.len();
188
189 let stdout_unlocked = std::io::stdout();
190 let stdout_locked = stdout_unlocked.lock();
191 let mut stdout = std::io::BufWriter::new(stdout_locked);
192
193 while toflush > 0 {
194 let record = match self.records.pop() {
195 Some(r) => r,
196 None => {
197 break;
201 }
202 };
203 toflush -= 1;
204
205 write!(stdout, "{record}")?;
206
207 if record.level <= Level::Error && *self.report_errors_to_stderr.get().unwrap() {
208 stdout.flush()?;
212
213 let stderr_unlocked = std::io::stderr();
218 let stderr_locked = stderr_unlocked.lock();
219 let mut stderr = std::io::BufWriter::new(stderr_locked);
220 writeln!(stderr, "Error: {}", record.message)?;
221
222 stderr.flush()?;
224 drop(stderr);
225 }
226 }
227
228 stdout.flush()?;
230 drop(stdout);
231
232 if let Some(done_sender) = done_sender {
233 done_sender.send(()).unwrap_or_else(|e| {
237 println!(
238 "WARNING: Logger couldn't notify
239 calling thread: {:?}",
240 e
241 )
242 });
243 }
244 Ok(())
245 }
246
247 pub fn set_buffering_enabled(&self, buffering_enabled: bool) {
251 let mut writer = self.buffering_enabled.write().unwrap();
252 *writer = buffering_enabled;
253 }
254
255 pub fn max_level(&self) -> LevelFilter {
257 self.max_log_level
258 .get()
259 .copied()
260 .unwrap_or(LevelFilter::Trace)
261 }
262
263 fn set_max_level(&self, level: LevelFilter) {
266 self.max_log_level.set(level).unwrap()
267 }
268
269 fn set_report_errors_to_stderr(&self, val: bool) {
274 self.report_errors_to_stderr.set(val).unwrap()
275 }
276
277 fn flush_impl(&self, notify_done: Option<Sender<()>>) {
279 self.send_command(LoggerCommand::Flush(notify_done))
280 }
281
282 fn flush_sync(&self) {
284 let (done_sender, done_receiver) = std::sync::mpsc::channel();
285 self.flush_impl(Some(done_sender));
286 done_receiver.recv().unwrap();
287 }
288
289 fn flush_async(&self) {
291 self.flush_impl(None);
292 }
293
294 fn send_command(&self, cmd: LoggerCommand) {
296 SENDER
297 .try_with(|thread_sender| {
298 if thread_sender.borrow().is_none() {
299 let lock = self.command_sender.lock().unwrap();
300 *thread_sender.borrow_mut() = Some(lock.clone());
301 }
302 thread_sender
303 .borrow()
304 .as_ref()
305 .unwrap()
306 .send(cmd)
307 .unwrap_or_else(|e| {
308 println!("WARNING: Couldn't send command to logger thread: {:?}", e);
309 });
310 })
311 .unwrap_or_else(|e| {
312 println!(
313 "WARNING: Couldn't get sender channel to logger thread: {:?}",
314 e
315 );
316 });
317 }
318}
319
320impl Log for ShadowLogger {
321 fn enabled(&self, metadata: &Metadata) -> bool {
322 let filter = match Worker::with_active_host(|host| host.info().log_level) {
323 Some(Some(level)) => level,
324 _ => self.max_level(),
325 };
326 metadata.level() <= filter
327 }
328
329 fn log(&self, record: &Record) {
330 if !self.enabled(record.metadata()) {
331 return;
332 }
333
334 let message = std::fmt::format(*record.args());
335
336 let host_info = Worker::with_active_host(|host| host.info().clone());
337
338 let mut shadowrecord = ShadowLogRecord {
339 level: record.level(),
340 file: record.file_static(),
341 module_path: record.module_path_static(),
342 line: record.line(),
343 message,
344 wall_time: Duration::from_micros(unsafe {
345 u64::try_from(c_log::logger_elapsed_micros()).unwrap()
346 }),
347
348 emu_time: Worker::current_time(),
349 thread_name: THREAD_NAME
350 .try_with(|name| (*name).clone())
351 .unwrap_or_else(|_| get_thread_name()),
352 thread_id: THREAD_ID
353 .try_with(|id| *id)
354 .unwrap_or_else(|_| nix::unistd::gettid()),
355 host_info,
356 };
357
358 loop {
359 match self.records.push(shadowrecord) {
360 Ok(()) => break,
361 Err(r) => {
362 shadowrecord = r;
364 self.flush_sync();
365 }
366 }
367 }
368
369 if record.level() == Level::Error {
370 self.flush_sync();
376 } else if self.records.len() > ASYNC_FLUSH_QD_LINES_THRESHOLD
377 || !*self.buffering_enabled.read().unwrap()
378 {
379 self.flush_async();
380 }
381 }
382
383 fn flush(&self) {
384 self.flush_sync();
385 }
386}
387
388struct ShadowLogRecord {
389 level: Level,
390 file: Option<&'static str>,
391 module_path: Option<&'static str>,
392 line: Option<u32>,
393 message: String,
394 wall_time: Duration,
395
396 emu_time: Option<EmulatedTime>,
397 thread_name: String,
398 thread_id: nix::unistd::Pid,
399 host_info: Option<Arc<HostInfo>>,
400}
401
402impl std::fmt::Display for ShadowLogRecord {
403 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
404 {
405 let parts = TimeParts::from_nanos(self.wall_time.as_nanos());
406 write!(
407 f,
408 "{:02}:{:02}:{:02}.{:06}",
409 parts.hours,
410 parts.mins,
411 parts.secs,
412 parts.nanos / 1000
413 )?;
414 }
415 write!(f, " [{}:{}]", self.thread_id, self.thread_name)?;
416 if let Some(emu_time) = self.emu_time {
417 let sim_time = emu_time.duration_since(&EmulatedTime::SIMULATION_START);
418 let parts = TimeParts::from_nanos(sim_time.as_nanos());
419 write!(
420 f,
421 " {:02}:{:02}:{:02}.{:09}",
422 parts.hours, parts.mins, parts.secs, parts.nanos
423 )?;
424 } else {
425 write!(f, " n/a")?;
426 }
427 write!(f, " [{level}]", level = self.level)?;
428 if let Some(host) = &self.host_info {
429 write!(
430 f,
431 " [{hostname}:{ip}]",
432 hostname = host.name,
433 ip = host.default_ip,
434 )?;
435 } else {
436 write!(f, " [n/a]",)?;
437 }
438 write!(
439 f,
440 " [{file}:",
441 file = self
442 .file
443 .map(|f| if let Some(sep_pos) = f.rfind('/') {
444 &f[(sep_pos + 1)..]
445 } else {
446 f
447 })
448 .unwrap_or("n/a"),
449 )?;
450 if let Some(line) = self.line {
451 write!(f, "{line}", line = line)?;
452 } else {
453 write!(f, "n/a")?;
454 }
455 writeln!(
456 f,
457 "] [{module}] {msg}",
458 module = self.module_path.unwrap_or("n/a"),
459 msg = self.message
460 )?;
461 Ok(())
462 }
463}
464
465enum LoggerCommand {
466 Flush(Option<Sender<()>>),
468}
469
470pub fn set_buffering_enabled(buffering_enabled: bool) {
471 SHADOW_LOGGER.set_buffering_enabled(buffering_enabled);
472}
473
474mod export {
475 use super::*;
476
477 #[unsafe(no_mangle)]
481 pub unsafe extern "C-unwind" fn shadow_logger_setEnableBuffering(buffering_enabled: i32) {
482 set_buffering_enabled(buffering_enabled != 0)
483 }
484}