Skip to content

Commit

Permalink
src: use only one tracing write fs req at a time
Browse files Browse the repository at this point in the history
Concurrent writes to the same fd are generally not ideal,
since it’s not generally guaranteed that data from those
writes will end up on disk in the right order.

PR-URL: #21867
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Eugene Ostroukhov <eostroukhov@google.com>
Reviewed-By: Ali Ijaz Sheikh <ofrobots@google.com>
  • Loading branch information
addaleax authored and targos committed Aug 1, 2018
1 parent 6b58746 commit ba480d3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 30 deletions.
71 changes: 45 additions & 26 deletions src/tracing/node_trace_writer.cc
Expand Up @@ -158,38 +158,57 @@ void NodeTraceWriter::Flush(bool blocking) {

void NodeTraceWriter::WriteToFile(std::string&& str, int highest_request_id) {
if (fd_ == -1) return;
WriteRequest* write_req = new WriteRequest();
write_req->str = std::move(str);
write_req->writer = this;
write_req->highest_request_id = highest_request_id;
uv_buf_t uv_buf = uv_buf_init(const_cast<char*>(write_req->str.c_str()),
write_req->str.length());
request_mutex_.Lock();
// Manage a queue of WriteRequest objects because the behavior of uv_write is
// undefined if the same WriteRequest object is used more than once
// between WriteCb calls. In addition, this allows us to keep track of the id
// of the latest write request that actually been completed.
write_req_queue_.push(write_req);
request_mutex_.Unlock();
int err = uv_fs_write(tracing_loop_, reinterpret_cast<uv_fs_t*>(write_req),
fd_, &uv_buf, 1, -1, WriteCb);

uv_buf_t buf = uv_buf_init(nullptr, 0);
{
Mutex::ScopedLock lock(request_mutex_);
write_req_queue_.emplace(WriteRequest {
std::move(str), highest_request_id
});
if (write_req_queue_.size() == 1) {
buf = uv_buf_init(
const_cast<char*>(write_req_queue_.front().str.c_str()),
write_req_queue_.front().str.length());
}
}
// Only one write request for the same file descriptor should be active at
// a time.
if (buf.base != nullptr && fd_ != -1) {
StartWrite(buf);
}
}

void NodeTraceWriter::StartWrite(uv_buf_t buf) {
int err = uv_fs_write(
tracing_loop_, &write_req_, fd_, &buf, 1, -1,
[](uv_fs_t* req) {
NodeTraceWriter* writer =
ContainerOf(&NodeTraceWriter::write_req_, req);
writer->AfterWrite();
});
CHECK_EQ(err, 0);
}

void NodeTraceWriter::WriteCb(uv_fs_t* req) {
WriteRequest* write_req = ContainerOf(&WriteRequest::req, req);
CHECK_GE(write_req->req.result, 0);
void NodeTraceWriter::AfterWrite() {
CHECK_GE(write_req_.result, 0);
uv_fs_req_cleanup(&write_req_);

NodeTraceWriter* writer = write_req->writer;
int highest_request_id = write_req->highest_request_id;
uv_buf_t buf = uv_buf_init(nullptr, 0);
{
Mutex::ScopedLock scoped_lock(writer->request_mutex_);
CHECK_EQ(write_req, writer->write_req_queue_.front());
writer->write_req_queue_.pop();
writer->highest_request_id_completed_ = highest_request_id;
writer->request_cond_.Broadcast(scoped_lock);
Mutex::ScopedLock scoped_lock(request_mutex_);
int highest_request_id = write_req_queue_.front().highest_request_id;
write_req_queue_.pop();
highest_request_id_completed_ = highest_request_id;
request_cond_.Broadcast(scoped_lock);
if (!write_req_queue_.empty()) {
buf = uv_buf_init(
const_cast<char*>(write_req_queue_.front().str.c_str()),
write_req_queue_.front().str.length());
}
}
if (buf.base != nullptr && fd_ != -1) {
StartWrite(buf);
}
delete write_req;
}

// static
Expand Down
8 changes: 4 additions & 4 deletions src/tracing/node_trace_writer.h
Expand Up @@ -27,13 +27,12 @@ class NodeTraceWriter : public AsyncTraceWriter {

private:
struct WriteRequest {
uv_fs_t req;
NodeTraceWriter* writer;
std::string str;
int highest_request_id;
};

static void WriteCb(uv_fs_t* req);
void AfterWrite();
void StartWrite(uv_buf_t buf);
void OpenNewFileForStreaming();
void WriteToFile(std::string&& str, int highest_request_id);
void WriteSuffix();
Expand All @@ -56,7 +55,8 @@ class NodeTraceWriter : public AsyncTraceWriter {
// Used to wait until async handles have been closed.
ConditionVariable exit_cond_;
int fd_ = -1;
std::queue<WriteRequest*> write_req_queue_;
uv_fs_t write_req_;
std::queue<WriteRequest> write_req_queue_;
int num_write_requests_ = 0;
int highest_request_id_completed_ = 0;
int total_traces_ = 0;
Expand Down

0 comments on commit ba480d3

Please sign in to comment.