Skip to content

Commit

Permalink
src: initialize file trace writer on tracing thread
Browse files Browse the repository at this point in the history
Run the initialization for the file trace writer’s `uv_async_t`s
on the same thread as `uv_run()` for their loop to avoid race
conditions.

PR-URL: #21867
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Eugene Ostroukhov <eostroukhov@google.com>
Reviewed-By: Ali Ijaz Sheikh <ofrobots@google.com>
  • Loading branch information
addaleax authored and targos committed Aug 1, 2018
1 parent 56edd5f commit 89e2302
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 23 deletions.
3 changes: 1 addition & 2 deletions src/node.cc
Expand Up @@ -434,8 +434,7 @@ static struct {
tracing_file_writer_ = tracing_agent_->AddClient(
ParseCommaSeparatedSet(trace_enabled_categories),
std::unique_ptr<tracing::AsyncTraceWriter>(
new tracing::NodeTraceWriter(trace_file_pattern,
tracing_agent_->loop())),
new tracing::NodeTraceWriter(trace_file_pattern)),
tracing::Agent::kUseDefaultCategories);
}
}
Expand Down
31 changes: 31 additions & 0 deletions src/tracing/agent.cc
Expand Up @@ -53,9 +53,27 @@ Agent::Agent() {
tracing_controller_->Initialize(nullptr);

CHECK_EQ(uv_loop_init(&tracing_loop_), 0);
CHECK_EQ(uv_async_init(&tracing_loop_,
&initialize_writer_async_,
[](uv_async_t* async) {
Agent* agent = ContainerOf(&Agent::initialize_writer_async_, async);
agent->InitializeWritersOnThread();
}), 0);
}

void Agent::InitializeWritersOnThread() {
Mutex::ScopedLock lock(initialize_writer_mutex_);
while (!to_be_initialized_.empty()) {
AsyncTraceWriter* head = *to_be_initialized_.begin();
head->InitializeOnThread(&tracing_loop_);
to_be_initialized_.erase(head);
}
initialize_writer_condvar_.Broadcast(lock);
}

Agent::~Agent() {
uv_close(reinterpret_cast<uv_handle_t*>(&initialize_writer_async_), nullptr);
uv_run(&tracing_loop_, UV_RUN_ONCE);
CheckedUvLoopClose(&tracing_loop_);
}

Expand Down Expand Up @@ -95,9 +113,18 @@ AgentWriterHandle Agent::AddClient(

ScopedSuspendTracing suspend(tracing_controller_, this);
int id = next_writer_id_++;
AsyncTraceWriter* raw = writer.get();
writers_[id] = std::move(writer);
categories_[id] = { use_categories->begin(), use_categories->end() };

{
Mutex::ScopedLock lock(initialize_writer_mutex_);
to_be_initialized_.insert(raw);
uv_async_send(&initialize_writer_async_);
while (to_be_initialized_.count(raw) > 0)
initialize_writer_condvar_.Wait(lock);
}

return AgentWriterHandle(this, id);
}

Expand All @@ -120,6 +147,10 @@ void Agent::StopTracing() {

void Agent::Disconnect(int client) {
if (client == kDefaultHandleId) return;
{
Mutex::ScopedLock lock(initialize_writer_mutex_);
to_be_initialized_.erase(writers_[client].get());
}
ScopedSuspendTracing suspend(tracing_controller_, this);
writers_.erase(client);
categories_.erase(client);
Expand Down
13 changes: 10 additions & 3 deletions src/tracing/agent.h
Expand Up @@ -5,6 +5,7 @@
#include "uv.h"
#include "v8.h"
#include "util.h"
#include "node_mutex.h"

#include <set>
#include <string>
Expand All @@ -23,6 +24,7 @@ class AsyncTraceWriter {
virtual ~AsyncTraceWriter() {}
virtual void AppendTraceEvent(TraceObject* trace_event) = 0;
virtual void Flush(bool blocking) = 0;
virtual void InitializeOnThread(uv_loop_t* loop) {}
};

class TracingController : public v8::platform::tracing::TracingController {
Expand Down Expand Up @@ -92,13 +94,11 @@ class Agent {

TraceConfig* CreateTraceConfig() const;

// TODO(addaleax): This design is broken and inherently thread-unsafe.
inline uv_loop_t* loop() { return &tracing_loop_; }

private:
friend class AgentWriterHandle;

static void ThreadCb(void* arg);
void InitializeWritersOnThread();

void Start();
void StopTracing();
Expand All @@ -120,6 +120,13 @@ class Agent {
std::unordered_map<int, std::multiset<std::string>> categories_;
std::unordered_map<int, std::unique_ptr<AsyncTraceWriter>> writers_;
TracingController* tracing_controller_ = nullptr;

// Variables related to initializing per-event-loop properties of individual
// writers, such as libuv handles.
Mutex initialize_writer_mutex_;
ConditionVariable initialize_writer_condvar_;
uv_async_t initialize_writer_async_;
std::set<AsyncTraceWriter*> to_be_initialized_;
};

void AgentWriterHandle::reset() {
Expand Down
32 changes: 19 additions & 13 deletions src/tracing/node_trace_writer.cc
Expand Up @@ -3,16 +3,25 @@
#include <string.h>
#include <fcntl.h>

#include "util.h"
#include "util-inl.h"

namespace node {
namespace tracing {

NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern,
uv_loop_t* tracing_loop)
: tracing_loop_(tracing_loop), log_file_pattern_(log_file_pattern) {
NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern)
: log_file_pattern_(log_file_pattern) {}

void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) {
CHECK_NULL(tracing_loop_);
tracing_loop_ = loop;

flush_signal_.data = this;
int err = uv_async_init(tracing_loop_, &flush_signal_, FlushSignalCb);
int err = uv_async_init(tracing_loop_, &flush_signal_,
[](uv_async_t* signal) {
NodeTraceWriter* trace_writer =
ContainerOf(&NodeTraceWriter::flush_signal_, signal);
trace_writer->FlushPrivate();
});
CHECK_EQ(err, 0);

exit_signal_.data = this;
Expand Down Expand Up @@ -126,11 +135,6 @@ void NodeTraceWriter::FlushPrivate() {
WriteToFile(std::move(str), highest_request_id);
}

void NodeTraceWriter::FlushSignalCb(uv_async_t* signal) {
NodeTraceWriter* trace_writer = static_cast<NodeTraceWriter*>(signal->data);
trace_writer->FlushPrivate();
}

void NodeTraceWriter::Flush(bool blocking) {
Mutex::ScopedLock scoped_lock(request_mutex_);
if (!json_trace_writer_) {
Expand Down Expand Up @@ -170,7 +174,7 @@ void NodeTraceWriter::WriteToFile(std::string&& str, int highest_request_id) {
}

void NodeTraceWriter::WriteCb(uv_fs_t* req) {
WriteRequest* write_req = reinterpret_cast<WriteRequest*>(req);
WriteRequest* write_req = ContainerOf(&WriteRequest::req, req);
CHECK_GE(write_req->req.result, 0);

NodeTraceWriter* writer = write_req->writer;
Expand All @@ -187,13 +191,15 @@ void NodeTraceWriter::WriteCb(uv_fs_t* req) {

// static
void NodeTraceWriter::ExitSignalCb(uv_async_t* signal) {
NodeTraceWriter* trace_writer = static_cast<NodeTraceWriter*>(signal->data);
NodeTraceWriter* trace_writer =
ContainerOf(&NodeTraceWriter::exit_signal_, signal);
uv_close(reinterpret_cast<uv_handle_t*>(&trace_writer->flush_signal_),
nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&trace_writer->exit_signal_),
[](uv_handle_t* signal) {
NodeTraceWriter* trace_writer =
static_cast<NodeTraceWriter*>(signal->data);
ContainerOf(&NodeTraceWriter::exit_signal_,
reinterpret_cast<uv_async_t*>(signal));
Mutex::ScopedLock scoped_lock(trace_writer->request_mutex_);
trace_writer->exited_ = true;
trace_writer->exit_cond_.Signal(scoped_lock);
Expand Down
8 changes: 3 additions & 5 deletions src/tracing/node_trace_writer.h
Expand Up @@ -4,7 +4,6 @@
#include <sstream>
#include <queue>

#include "node_mutex.h"
#include "libplatform/v8-tracing.h"
#include "tracing/agent.h"
#include "uv.h"
Expand All @@ -17,10 +16,10 @@ using v8::platform::tracing::TraceWriter;

class NodeTraceWriter : public AsyncTraceWriter {
public:
explicit NodeTraceWriter(const std::string& log_file_pattern,
uv_loop_t* tracing_loop);
explicit NodeTraceWriter(const std::string& log_file_pattern);
~NodeTraceWriter();

void InitializeOnThread(uv_loop_t* loop) override;
void AppendTraceEvent(TraceObject* trace_event) override;
void Flush(bool blocking) override;

Expand All @@ -38,11 +37,10 @@ class NodeTraceWriter : public AsyncTraceWriter {
void OpenNewFileForStreaming();
void WriteToFile(std::string&& str, int highest_request_id);
void WriteSuffix();
static void FlushSignalCb(uv_async_t* signal);
void FlushPrivate();
static void ExitSignalCb(uv_async_t* signal);

uv_loop_t* tracing_loop_;
uv_loop_t* tracing_loop_ = nullptr;
// Triggers callback to initiate writing the contents of stream_ to disk.
uv_async_t flush_signal_;
// Triggers callback to close async objects, ending the tracing thread.
Expand Down

0 comments on commit 89e2302

Please sign in to comment.