/** Copyright (c) 2013 "Marco Rovere" This code is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . This code is simple: its sole purpose is to either dump or add ProtocolBuffer-gzipped files that are meant to replace ordinary ROOT files containing only hierarchies of histograms, arranged in arbitrarily complex levels of directories. The merging logic is such that plots present in all files are added, while plots present in some of the files are anyway tracked and added, if similar ones are found in other files. The logic of the merging algorithm is trivial and fully rely on the ordered nature of the ProtocolBuffer files read in input. An internal set container of MicroME is used to host the final results. The relational ordering of the set must be guaranteed to match the one used to order the ProtocolBuffer files for optimal performance and correctness. A dependency on protocolbuffer is needed and should be alrady included out of the box into any recent CMSSW release. In case the protoclBuffer package is not avaialble, you need to install it as an external toolfile. Therefore, in order to be able to compile and run the code, you need to locally install protocol buffer 2.4.1 and add it as a scram tool to your preferred CMSSW development area. The toolfile I used is: To register it into your development area you can simply do: scram setup protocolbuf.xml To verify the correctness of the information, do: scram tool info protocolbuf. You should see an output similar to the following: Tool info as configured in location /afs/cern.ch/work/r/rovere/fastHistoMergingPB/CMSSW_7_0_X_2013-07-08-0200 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Name : protocolbuf Version : 2.4.1 ++++++++++++++++++++ SCRAM_PROJECT=no PROTOCOLBUF_CLIENT_BASE=/afs/cern.ch/work/r/rovere/protocolbuf LIB=protobuf LIBDIR=/afs/cern.ch/work/r/rovere/protocolbuf/lib INCLUDE=/afs/cern.ch/work/r/rovere/protocolbuf/include USE=zlib PATH=/afs/cern.ch/work/r/rovere/protocolbuf/bin */ #include #include #include #include #include #include #include #include #include #include #include #include "DQMServices/Core/src/ROOTFilePB.pb.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define DEBUG(x, msg) if (debug >= x) std::cout << "DEBUG: " << msg << std::flush int debug = 0; struct MicroME { MicroME( TObject *o, const std::string& dir, const std::string& obj, uint32_t flags = 0) : obj(o), dirname(dir), objname(obj), flags(flags) {} mutable TObject * obj; const std::string dirname; const std::string objname; uint32_t flags; bool operator<(const MicroME &rhs) const { const MicroME &lhs = *this; int diff = lhs.dirname.compare(rhs.dirname); return (diff < 0 ? true : diff == 0 ? lhs.objname < rhs.objname : false); }; void add(TObject *obj_to_add) const { DEBUG(1, "Merging: " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl); if (dynamic_cast(obj) && dynamic_cast(obj_to_add)) { dynamic_cast(obj)->Add(dynamic_cast(obj_to_add)); } else if (dynamic_cast(obj) && dynamic_cast(obj_to_add)) { } else { DEBUG(1, "Cannot merge (different types): " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl); } }; const std::string fullname() const { return dirname + '/' + objname; }; }; using MEStore = std::set; enum TaskType { TASK_ADD, TASK_DUMP, TASK_CONVERT, TASK_ENCODE }; enum ErrType { ERR_BADCFG=1, ERR_NOFILE }; using google::protobuf::io::FileInputStream; using google::protobuf::io::FileOutputStream; using google::protobuf::io::GzipInputStream; using google::protobuf::io::GzipOutputStream; using google::protobuf::io::CodedInputStream; using google::protobuf::io::ArrayInputStream; /** Extract the next serialised ROOT object from @a buf. Returns null if there are no more objects in the buffer, or a null pointer was serialised at this location. */ inline TObject * extractNextObject(TBufferFile &buf) { if (buf.Length() == buf.BufferSize()) return nullptr; buf.InitMap(); return reinterpret_cast(buf.ReadObjectAny(nullptr)); } static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject ** obj) { size_t slash = h.full_pathname().rfind('/'); size_t dirpos = (slash == std::string::npos ? 0 : slash); size_t namepos = (slash == std::string::npos ? 0 : slash+1); dirname.assign(h.full_pathname(), 0, dirpos); objname.assign(h.full_pathname(), namepos, std::string::npos); TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE); buf.Reset(); *obj = extractNextObject(buf); if (!*obj) { std::cerr << "Error reading element: " << h.full_pathname() << std::endl; } } void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd) { FileOutputStream out_stream(out_fd); GzipOutputStream::Options options; options.format = GzipOutputStream::GZIP; options.compression_level = 2; GzipOutputStream gzip_stream(&out_stream, options); dqmstore_output_msg.SerializeToZeroCopyStream(&gzip_stream); // make sure we flush before close gzip_stream.Close(); out_stream.Close(); } void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename) { DEBUG(1, "Writing file" << std::endl); int out_fd = ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); writeMessageFD(dqmstore_output_msg, out_fd); ::close(out_fd); } void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const MEStore & micromes) { auto mi = micromes.begin(); auto me = micromes.end(); DEBUG(1, "Streaming ROOT objects" << std::endl); for (; mi != me; ++mi) { dqmstorepb::ROOTFilePB::Histo* h = dqmstore_output_msg.add_histo(); DEBUG(2, "Streaming ROOT object " << mi->fullname() << "\n"); h->set_full_pathname(mi->fullname()); TBufferFile buffer(TBufferFile::kWrite); buffer.WriteObject(mi->obj); h->set_size(buffer.Length()); h->set_flags(mi->flags); h->set_streamed_histo((const void*)buffer.Buffer(), buffer.Length()); delete mi->obj; } } void processDirectory(TFile *file, const std::string& curdir, MEStore& micromes) { DEBUG(1, "Processing directory " << curdir << "\n"); file->cd(curdir.c_str()); TKey *key; TIter next (gDirectory->GetListOfKeys()); while ((key = (TKey *) next())) { TObject * obj = key->ReadObj(); if (dynamic_cast(obj)) { std::string subdir; subdir.reserve(curdir.size() + strlen(obj->GetName()) + 2); subdir += curdir; if (! curdir.empty()) subdir += '/'; subdir += obj->GetName(); processDirectory(file, subdir, micromes); } else if ((dynamic_cast(obj)) || (dynamic_cast(obj))) { if (dynamic_cast(obj)) { dynamic_cast(obj)->SetDirectory(nullptr); } DEBUG(2, curdir << "/" << obj->GetName() << "\n"); MicroME mme(obj, curdir, obj->GetName()); micromes.insert(mme); } } } int encodeFile(const std::string &output_filename, const std::vector &filenames) { assert(filenames.size() == 1); TFile input(filenames[0].c_str()); DEBUG(0, "Encoding file " << filenames[0] << std::endl); MEStore micromes; dqmstorepb::ROOTFilePB dqmstore_message; processDirectory(&input, "", micromes); fillMessage(dqmstore_message, micromes); writeMessage(dqmstore_message, output_filename); return 0; } int convertFile(const std::string &output_filename, const std::vector &filenames) { assert(filenames.size() == 1); TFile output(output_filename.c_str(), "RECREATE"); DEBUG(0, "Converting file " << filenames[0] << std::endl); dqmstorepb::ROOTFilePB dqmstore_message; int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY); FileInputStream fin(filedescriptor); GzipInputStream input(&fin); CodedInputStream input_coded(&input); input_coded.SetTotalBytesLimit(1024*1024*1024, -1); if (!dqmstore_message.ParseFromCodedStream(&input_coded)) { std::cout << "Fatal Error opening file " << filenames[0] << std::endl; return ERR_NOFILE; } ::close(filedescriptor); for (int i = 0; i < dqmstore_message.histo_size(); i++) { const dqmstorepb::ROOTFilePB::Histo& h = dqmstore_message.histo(i); DEBUG(1, h.full_pathname() << std::endl); DEBUG(1, h.size() << std::endl); TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE); buf.Reset(); TObject *obj = extractNextObject(buf); std::string path,objname; get_info(h, path, objname, &obj); gDirectory->cd("/"); // Find the first path component. size_t start = 0; size_t end = path.find('/', start); if (end == std::string::npos) end = path.size(); while (true) { std::string part(path, start, end-start); if (! gDirectory->Get(part.c_str())) gDirectory->mkdir(part.c_str()); gDirectory->cd(part.c_str()); // Stop if we reached the end, ignoring any trailing '/'. if (end+1 >= path.size()) break; // Find the next path component. start = end+1; end = path.find('/', start); if (end == std::string::npos) end = path.size(); } obj->Write(); DEBUG(1, obj->GetName() << std::endl); } output.Close(); return 0; } int dumpFiles(const std::vector &filenames) { assert(!filenames.empty()); for (int i = 0, e = filenames.size(); i != e; ++i) { DEBUG(0, "Dumping file " << filenames[i] << std::endl); dqmstorepb::ROOTFilePB dqmstore_message; int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY); FileInputStream fin(filedescriptor); GzipInputStream input(&fin); CodedInputStream input_coded(&input); input_coded.SetTotalBytesLimit(1024*1024*1024, -1); if (!dqmstore_message.ParseFromCodedStream(&input_coded)) { std::cout << "Fatal Error opening file " << filenames[0] << std::endl; return ERR_NOFILE; } ::close(filedescriptor); for (int i = 0; i < dqmstore_message.histo_size(); i++) { const dqmstorepb::ROOTFilePB::Histo& h = dqmstore_message.histo(i); DEBUG(1, h.full_pathname() << std::endl); DEBUG(1, h.size() << std::endl); TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE); buf.Reset(); TObject *obj = extractNextObject(buf); DEBUG(1, obj->GetName() << std::endl); DEBUG(1, "Flags: " << h.flags() << std::endl); } } return 0; } int addFile(MEStore& micromes, int fd) { dqmstorepb::ROOTFilePB dqmstore_msg; FileInputStream fin(fd); GzipInputStream input(&fin); CodedInputStream input_coded(&input); input_coded.SetTotalBytesLimit(1024*1024*1024, -1); if (!dqmstore_msg.ParseFromCodedStream(&input_coded)) { std::cout << "Fatal decoding stream: " << fd << std::endl; return ERR_NOFILE; } auto hint = micromes.begin(); for (int i = 0; i < dqmstore_msg.histo_size(); i++) { std::string path; std::string objname; TObject *obj = nullptr; const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_msg.histo(i); get_info(h, path, objname, &obj); MicroME mme(nullptr, path, objname, h.flags()); auto ir = micromes.insert(hint, mme); if (ir->obj != nullptr) { // new element was not added // so we merge ir->add(obj); delete obj; DEBUG(2, "Merged MicroME " << mme.fullname() << std::endl); } else { ir->obj = obj; DEBUG(2, "Inserted MicroME " << mme.fullname() << std::endl); } hint = ir; ++hint; } return 0; } // The idea is to preload root library (before forking). // Which is significant for performance and especially memory usage, // because root aakes a long time to init (and somehow manages to launch a subshell). void tryRootPreload() { // write a single histogram TH1F obj_th1f("preload_th1f", "preload_th1f", 2, 0, 1); TBufferFile write_buffer(TBufferFile::kWrite); write_buffer.WriteObject(&obj_th1f); dqmstorepb::ROOTFilePB preload_file; dqmstorepb::ROOTFilePB::Histo* hw = preload_file.add_histo(); hw->set_size(write_buffer.Length()); hw->set_flags(0); hw->set_streamed_histo((const void*)write_buffer.Buffer(), write_buffer.Length()); // now load this th1f const dqmstorepb::ROOTFilePB::Histo &hr = preload_file.histo(0); std::string path; std::string objname; TObject *obj = nullptr; get_info(hr, path, objname, &obj); delete obj; // all done } /* fork_id represents the position in a node (node number). */ void addFilesWithFork(int parent_fd, const int fork_id, const int fork_total, const std::vector& filenames) { DEBUG(1, "Start process: " << fork_id << " parent: " << (fork_id / 2) << std::endl); std::list > children; // if this node has a subtree, start it for (int i = 0; i < 2; ++i) { int child_id = fork_id*2 + i; if (child_id > fork_total) continue; int fd[2]; ::pipe(fd); int child_pid = ::fork(); if (child_pid == 0) { ::prctl(PR_SET_PDEATHSIG, SIGKILL); ::close(fd[0]); // close read end addFilesWithFork(fd[1], child_id, fork_total, filenames); ::close(fd[1]); ::_exit(0); } else { ::close(fd[1]); // close write end children.push_back(std::make_pair(fd[0], child_pid)); } } // merge all my files MEStore microme; // select the filenames to process // with threads=1, this just selects all the files for (unsigned int fi = fork_id - 1; fi < filenames.size(); fi += fork_total) { const std::string& file = filenames[fi]; DEBUG(1, "Adding file " << file << std::endl); int filedescriptor; if ((filedescriptor = ::open(file.c_str(), O_RDONLY)) == -1) { std::cout << "Fatal Error opening file " << file << std::endl; exit(ERR_NOFILE); } addFile(microme, filedescriptor); ::close(filedescriptor); } // merge all children for (auto& chpair : children) { int fd = chpair.first; addFile(microme, fd); ::close(fd); // collect the child, not necessary, but avoids int status; ::waitpid(chpair.second, &status, 0); } // output everything to fd dqmstorepb::ROOTFilePB dqmstore_output_msg; fillMessage(dqmstore_output_msg, microme); writeMessageFD(dqmstore_output_msg, parent_fd); }; int addFiles(const std::string &output_filename, const std::vector &filenames, int nthreads) { tryRootPreload(); DEBUG(1, "Writing file" << std::endl); int out_fd = ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); addFilesWithFork(out_fd, 1, nthreads, filenames); ::close(out_fd); return 0; } static int showusage() { static const std::string app_name("fasthadd"); std::cerr << "Usage: " << app_name << " [--[no-]debug] TASK OPTIONS\n\n " << app_name << " [OPTIONS] add [-j NUM_THREADS] -o OUTPUT_FILE [DAT FILE...]\n " << app_name << " [OPTIONS] convert -o ROOT_FILE DAT_FILE\n " << app_name << " [OPTIONS] encode -o DAT_FILE ROOT_FILE\n " << app_name << " [OPTIONS] dump [DAT FILE...]\n "; return ERR_BADCFG; } int main(int argc, char * argv[]) { int arg; int ret = 0; int jobs = 1; std::string output_file; std::vector filenames; TaskType task; filenames.reserve(argc); for (arg = 1; arg < argc; ++arg) { if (! strcmp(argv[arg], "--no-debug")) debug = 0; else if (! strcmp(argv[arg], "--debug") || ! strcmp(argv[arg], "-d")) debug++; else break; } if (arg < argc) { if (! strcmp(argv[arg], "add")) { ++arg; task = TASK_ADD; } else if (! strcmp(argv[arg], "dump")) { ++arg; task = TASK_DUMP; } else if (! strcmp(argv[arg], "convert")) { ++arg; task = TASK_CONVERT; } else if (! strcmp(argv[arg], "encode")) { ++arg; task = TASK_ENCODE; } else { std::cerr << "Unknown action: " << argv[arg] << std::endl; return showusage(); } } else { std::cerr << "Not enough arguments\n"; return showusage(); } if (task == TASK_ADD) { if ((arg != argc) && (strcmp(argv[arg], "-j") == 0)) { jobs = atoi(argv[arg+1]); if ((jobs < 1) || (jobs > 128)) { std::cerr << "Invalid argument for -j\n"; return showusage(); }; arg += 2; } } if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) { if (arg == argc) { std::cerr << "add|convert|encode actions requires a -o option to be set\n"; return showusage(); } if (! strcmp(argv[arg], "-o")) { if (arg < argc-1) { output_file = argv[++arg]; } else { std::cerr << " -o option requires a value\n"; return showusage(); } } } else if (task == TASK_DUMP) { if (arg == argc) { std::cerr << "Missing input file(s)\n"; return showusage(); } for (; arg < argc; ++arg) { filenames.emplace_back(argv[arg]); } } if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) { if (++arg == argc) { std::cerr << "Missing input file(s)\n"; return showusage(); } for (; arg < argc; ++arg) { filenames.emplace_back(argv[arg]); } } if (task == TASK_ADD) ret = addFiles(output_file, filenames, jobs); else if (task == TASK_DUMP) ret = dumpFiles(filenames); else if (task == TASK_CONVERT) ret = convertFile(output_file, filenames); else if (task == TASK_ENCODE) ret = encodeFile(output_file, filenames); google::protobuf::ShutdownProtobufLibrary(); return ret; }