/** Copyright (c) 2013 "Marco Rovere" This code is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . This code is simple: its sole purpose is to either dump or add ProtocolBuffer-gzipped files that are meant to replace ordinary ROOT files containing only hierarchies of histograms, arranged in arbitrarily complex levels of directories. The merging logic is such that plots present in all files are added, while plots present in some of the files are anyway tracked and added, if similar ones are found in other files. The logic of the merging algorithm is trivial and fully rely on the ordered nature of the ProtocolBuffer files read in input. An internal set container of MicroME is used to host the final results. The relational ordering of the set must be guaranteed to match the one used to order the ProtocolBuffer files for optimal performance and correctness. A dependency on protocolbuffer is needed and should be alrady included out of the box into any recent CMSSW release. In case the protoclBuffer package is not avaialble, you need to install it as an external toolfile. Therefore, in order to be able to compile and run the code, you need to locally install protocol buffer 2.4.1 and add it as a scram tool to your preferred CMSSW development area. The toolfile I used is: To register it into your development area you can simply do: scram setup protocolbuf.xml To verify the correctness of the information, do: scram tool info protocolbuf. You should see an output similar to the following: Tool info as configured in location /afs/cern.ch/work/r/rovere/fastHistoMergingPB/CMSSW_7_0_X_2013-07-08-0200 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Name : protocolbuf Version : 2.4.1 ++++++++++++++++++++ SCRAM_PROJECT=no PROTOCOLBUF_CLIENT_BASE=/afs/cern.ch/work/r/rovere/protocolbuf LIB=protobuf LIBDIR=/afs/cern.ch/work/r/rovere/protocolbuf/lib INCLUDE=/afs/cern.ch/work/r/rovere/protocolbuf/include USE=zlib PATH=/afs/cern.ch/work/r/rovere/protocolbuf/bin */ #include #include #include #include #include #include #include #include #include "DQMServices/Core/src/ROOTFilePB.pb.h" #include #include #include #include #include #include #include #include #include #include #define DEBUG(x, msg) if (debug >= x) std::cout << "DEBUG: " << msg << std::flush int debug = 0; static bool lessThanMME(const std::string &lhs_dirname, const std::string &lhs_objname, const std::string &rhs_dirname, const std::string &rhs_objname) { int diff = lhs_dirname.compare(rhs_dirname); return (diff < 0 ? true : diff == 0 ? lhs_objname < rhs_objname : false); }; struct MicroME { MicroME(const std::string * full, const std::string * dir, const std::string * obj, uint32_t flags = 0) :fullname(full), dirname(dir), objname(obj), flags(flags) {} const std::string * fullname; const std::string * dirname; const std::string * objname; mutable TObject * obj; uint32_t flags; bool operator<(const MicroME &rhs) const { return lessThanMME(*this->dirname, *this->objname, *rhs.dirname, *rhs.objname); }; void add(TObject *obj_to_add) const { DEBUG(1, "Merging: " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl); if (dynamic_cast(obj) && dynamic_cast(obj_to_add)) { dynamic_cast(obj)->Add(dynamic_cast(obj_to_add)); } else if (dynamic_cast(obj) && dynamic_cast(obj_to_add)) { } else { DEBUG(1, "Cannot merge (different types): " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl); } }; }; enum TaskType { TASK_ADD, TASK_DUMP, TASK_CONVERT, TASK_ENCODE }; enum ErrType { ERR_BADCFG=1, ERR_NOFILE }; using google::protobuf::io::FileInputStream; using google::protobuf::io::FileOutputStream; using google::protobuf::io::GzipInputStream; using google::protobuf::io::GzipOutputStream; using google::protobuf::io::CodedInputStream; using google::protobuf::io::ArrayInputStream; /** Extract the next serialised ROOT object from @a buf. Returns null if there are no more objects in the buffer, or a null pointer was serialised at this location. */ inline TObject * extractNextObject(TBufferFile &buf) { if (buf.Length() == buf.BufferSize()) return 0; buf.InitMap(); return reinterpret_cast(buf.ReadObjectAny(0)); } static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject ** obj) { size_t slash = h.full_pathname().rfind('/'); size_t dirpos = (slash == std::string::npos ? 0 : slash); size_t namepos = (slash == std::string::npos ? 0 : slash+1); dirname.assign(h.full_pathname(), 0, dirpos); objname.assign(h.full_pathname(), namepos, std::string::npos); TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE); buf.Reset(); *obj = extractNextObject(buf); if (!*obj) { std::cerr << "Error reading element: " << h.full_pathname() << std::endl; } } void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename) { DEBUG(1, "Writing file" << std::endl); int out_fd = ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); FileOutputStream out_stream(out_fd); GzipOutputStream::Options options; options.format = GzipOutputStream::GZIP; options.compression_level = 2; GzipOutputStream gzip_stream(&out_stream, options); dqmstore_output_msg.SerializeToZeroCopyStream(&gzip_stream); // make sure we flush before close gzip_stream.Close(); out_stream.Close(); ::close(out_fd); } void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::set & micromes) { std::set::iterator mi = micromes.begin(); std::set::iterator me = micromes.end(); DEBUG(1, "Streaming ROOT objects" << std::endl); for (; mi != me; ++mi) { dqmstorepb::ROOTFilePB::Histo* h = dqmstore_output_msg.add_histo(); DEBUG(2, "Streaming ROOT object " << *(mi->fullname) << "\n"); h->set_full_pathname(*(mi->fullname)); TBufferFile buffer(TBufferFile::kWrite); buffer.WriteObject(mi->obj); h->set_size(buffer.Length()); h->set_flags(mi->flags); h->set_streamed_histo((const void*)buffer.Buffer(), buffer.Length()); delete mi->obj; } } void processDirectory(TFile *file, const std::string& curdir, std::set &dirs, std::set &objs, std::set &fullnames, std::set& micromes) { DEBUG(1, "Processing directory " << curdir << "\n"); file->cd(curdir.c_str()); TKey *key; TIter next (gDirectory->GetListOfKeys()); while ((key = (TKey *) next())) { TObject * obj = key->ReadObj(); if (dynamic_cast(obj)) { std::string subdir; subdir.reserve(curdir.size() + strlen(obj->GetName()) + 2); subdir += curdir; if (! curdir.empty()) subdir += '/'; subdir += obj->GetName(); processDirectory(file, subdir, dirs, objs, fullnames, micromes); } else if ((dynamic_cast(obj)) || (dynamic_cast(obj))) { if (dynamic_cast(obj)) { dynamic_cast(obj)->SetDirectory(0); } DEBUG(2, curdir << "/" << obj->GetName() << "\n"); MicroME mme(&*(fullnames.insert(curdir + '/' + std::string(obj->GetName())).first), &*(dirs.insert(curdir).first), &*(objs.insert(obj->GetName()).first)); if (obj) { mme.obj = obj; micromes.insert(mme); } } } } int encodeFile(const std::string &output_filename, const std::vector &filenames) { assert(filenames.size() == 1); TFile input(filenames[0].c_str()); DEBUG(0, "Encoding file " << filenames[0] << std::endl); std::set dirs; std::set objs; std::set fullnames; std::set micromes; dqmstorepb::ROOTFilePB dqmstore_message; processDirectory(&input, "", dirs, objs, fullnames, micromes); fillMessage(dqmstore_message, micromes); writeMessage(dqmstore_message, output_filename); return 0; } int convertFile(const std::string &output_filename, const std::vector &filenames) { assert(filenames.size() == 1); TFile output(output_filename.c_str(), "RECREATE"); DEBUG(0, "Converting file " << filenames[0] << std::endl); dqmstorepb::ROOTFilePB dqmstore_message; int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY); FileInputStream fin(filedescriptor); GzipInputStream input(&fin); CodedInputStream input_coded(&input); input_coded.SetTotalBytesLimit(1024*1024*1024, -1); if (!dqmstore_message.ParseFromCodedStream(&input_coded)) { std::cout << "Fatal Error opening file " << filenames[0] << std::endl; return ERR_NOFILE; } ::close(filedescriptor); for (int i = 0; i < dqmstore_message.histo_size(); i++) { const dqmstorepb::ROOTFilePB::Histo& h = dqmstore_message.histo(i); DEBUG(1, h.full_pathname() << std::endl); DEBUG(1, h.size() << std::endl); TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE); buf.Reset(); TObject *obj = extractNextObject(buf); std::string path,objname; get_info(h, path, objname, &obj); gDirectory->cd("/"); // Find the first path component. size_t start = 0; size_t end = path.find('/', start); if (end == std::string::npos) end = path.size(); while (true) { std::string part(path, start, end-start); if (! gDirectory->Get(part.c_str())) gDirectory->mkdir(part.c_str()); gDirectory->cd(part.c_str()); // Stop if we reached the end, ignoring any trailing '/'. if (end+1 >= path.size()) break; // Find the next path component. start = end+1; end = path.find('/', start); if (end == std::string::npos) end = path.size(); } obj->Write(); DEBUG(1, obj->GetName() << std::endl); } output.Close(); return 0; } int dumpFiles(const std::vector &filenames) { assert(filenames.size() > 0); for (int i = 0, e = filenames.size(); i != e; ++i) { DEBUG(0, "Dumping file " << filenames[i] << std::endl); dqmstorepb::ROOTFilePB dqmstore_message; int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY); FileInputStream fin(filedescriptor); GzipInputStream input(&fin); CodedInputStream input_coded(&input); input_coded.SetTotalBytesLimit(1024*1024*1024, -1); if (!dqmstore_message.ParseFromCodedStream(&input_coded)) { std::cout << "Fatal Error opening file " << filenames[0] << std::endl; return ERR_NOFILE; } ::close(filedescriptor); for (int i = 0; i < dqmstore_message.histo_size(); i++) { const dqmstorepb::ROOTFilePB::Histo& h = dqmstore_message.histo(i); DEBUG(1, h.full_pathname() << std::endl); DEBUG(1, h.size() << std::endl); TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE); buf.Reset(); TObject *obj = extractNextObject(buf); DEBUG(1, obj->GetName() << std::endl); DEBUG(1, "Flags: " << h.flags() << std::endl); } } return 0; } int addFiles(const std::string &output_filename, const std::vector &filenames) { dqmstorepb::ROOTFilePB dqmstore_outputmessage; std::set dirs; std::set objs; std::set fullnames; std::set micromes; assert(filenames.size() > 0); DEBUG(1, "Adding file " << filenames[0] << std::endl); { dqmstorepb::ROOTFilePB dqmstore_message; int filedescriptor; if ((filedescriptor = ::open(filenames[0].c_str(), O_RDONLY)) == -1) { std::cout << "Fatal Error opening file " << filenames[0] << std::endl; return ERR_NOFILE; } FileInputStream fin(filedescriptor); GzipInputStream input(&fin); CodedInputStream input_coded(&input); input_coded.SetTotalBytesLimit(1024*1024*1024, -1); if (!dqmstore_message.ParseFromCodedStream(&input_coded)) { std::cout << "Fatal Error opening file " << filenames[0] << std::endl; return ERR_NOFILE; } ::close(filedescriptor); for (int i = 0; i < dqmstore_message.histo_size(); i++) { std::string path; std::string objname; TObject *obj = NULL; const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_message.histo(i); get_info(h, path, objname, &obj); MicroME * mme = new MicroME(&*(fullnames.insert(h.full_pathname()).first), &*(dirs.insert(path).first), &*(objs.insert(objname).first), h.flags()); if (obj) { mme->obj = obj; micromes.insert(*mme); DEBUG(2, "Inserting MicroME " << *mme->fullname << std::endl); } } } for (int i = 1, e = filenames.size(); i != e; ++i) { DEBUG(1, "Adding file " << filenames[i] << std::endl); dqmstorepb::ROOTFilePB dqmstore_msg; int filedescriptor; if ((filedescriptor = ::open(filenames[i].c_str(), O_RDONLY)) == -1) { std::cout << "Fatal Error opening file " << filenames[i] << std::endl; return ERR_NOFILE; } FileInputStream fin(filedescriptor); GzipInputStream input(&fin); CodedInputStream input_coded(&input); input_coded.SetTotalBytesLimit(1024*1024*1024, -1); if (!dqmstore_msg.ParseFromCodedStream(&input_coded)) { std::cout << "Fatal Error opening file " << filenames[0] << std::endl; return ERR_NOFILE; } ::close(filedescriptor); std::set::iterator mi = micromes.begin(); std::set::iterator me = micromes.end(); int elem = 0; for (; mi != me; ++mi) { std::string path; std::string objname; dqmstorepb::ROOTFilePB::Histo h; TObject *obj = NULL; if (elem < dqmstore_msg.histo_size()) { dqmstorepb::ROOTFilePB::Histo &h = const_cast(dqmstore_msg.histo(elem)); get_info(h, path, objname, &obj); DEBUG(2, "Comparing " << *(*mi).dirname << "/" << *(*mi).objname << " vs " << h.full_pathname() << std::endl); int diff = (*mi).fullname->compare(h.full_pathname()); if (diff == 0 && obj != NULL) { mi->add(obj); delete obj; ++elem; } else if (! lessThanMME(*(*mi).dirname, *(*mi).objname, path, objname)) { // loop over elem till they are no longer less than iterator. bool loop = true; while (loop) { DEBUG(2, "Adding Missing histogram " << h.full_pathname() << std::endl); // That's fine since we add elements to the left of the // current node, so we do not screw up the iteration // process. MicroME * mme = new MicroME(&*(fullnames.insert(h.full_pathname()).first), &*(dirs.insert(path).first), &*(objs.insert(objname).first)); if (obj) { mme->obj = obj; micromes.insert(*mme); ++elem; } if (elem < dqmstore_msg.histo_size()) { h = const_cast(dqmstore_msg.histo(elem)); get_info(h, path, objname, &obj); DEBUG(2, "Comparing " << *(*mi).dirname << "/" << *(*mi).objname << " vs " << h.full_pathname() << std::endl); loop = ! lessThanMME(*(*mi).dirname, *(*mi).objname, path, objname); } else { loop = false; } } } } } // Transfer whatever else is left pending in the new file. while (elem < dqmstore_msg.histo_size()) { std::string path; std::string objname; TObject *obj = NULL; const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_msg.histo(elem); get_info(h, path, objname, &obj); DEBUG(2, "Adding Missing histogram " << h.full_pathname() << std::endl); MicroME * mme = new MicroME(&*(fullnames.insert(h.full_pathname()).first), &*(dirs.insert(path).first), &*(objs.insert(objname).first)); if (obj) { mme->obj = obj; micromes.insert(*mme); ++elem; } } } dqmstorepb::ROOTFilePB dqmstore_output_msg; fillMessage(dqmstore_output_msg, micromes); writeMessage(dqmstore_output_msg, output_filename); return 0; } static int showusage(void) { static const std::string app_name("fasthadd"); std::cerr << "Usage: " << app_name << " [--[no-]debug] TASK OPTIONS\n\n " << app_name << " [OPTIONS] add -o OUTPUT_FILE [DAT FILE...]\n " << app_name << " [OPTIONS] convert -o ROOT_FILE DAT_FILE\n " << app_name << " [OPTIONS] encode -o DAT_FILE ROOT_FILE\n " << app_name << " [OPTIONS] dump [DAT FILE...]\n "; return ERR_BADCFG; } int main(int argc, char * argv[]) { int arg; int ret = 0; std::string output_file; std::vector filenames; TaskType task; filenames.reserve(argc); for (arg = 1; arg < argc; ++arg) { if (! strcmp(argv[arg], "--no-debug")) debug = 0; else if (! strcmp(argv[arg], "--debug") || ! strcmp(argv[arg], "-d")) debug++; else break; } if (arg < argc) { if (! strcmp(argv[arg], "add")) { ++arg; task = TASK_ADD; } else if (! strcmp(argv[arg], "dump")) { ++arg; task = TASK_DUMP; } else if (! strcmp(argv[arg], "convert")) { ++arg; task = TASK_CONVERT; } else if (! strcmp(argv[arg], "encode")) { ++arg; task = TASK_ENCODE; } else { std::cerr << "Unknown action: " << argv[arg] << std::endl; return showusage(); } } else { std::cerr << "Not enough arguments\n"; return showusage(); } if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) { if (arg == argc) { std::cerr << "add|convert|encode actions requires a -o option to be set\n"; return showusage(); } if (! strcmp(argv[arg], "-o")) { if (arg < argc-1) { output_file = argv[++arg]; } else { std::cerr << " -o option requires a value\n"; return showusage(); } } } else if (task == TASK_DUMP) { if (arg == argc) { std::cerr << "Missing input file(s)\n"; return showusage(); } for (; arg < argc; ++arg) { filenames.push_back(argv[arg]); } } if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) { if (++arg == argc) { std::cerr << "Missing input file(s)\n"; return showusage(); } for (; arg < argc; ++arg) { filenames.push_back(argv[arg]); } } if (task == TASK_ADD) ret = addFiles(output_file, filenames); else if (task == TASK_DUMP) ret = dumpFiles(filenames); else if (task == TASK_CONVERT) ret = convertFile(output_file, filenames); else if (task == TASK_ENCODE) ret = encodeFile(output_file, filenames); google::protobuf::ShutdownProtobufLibrary(); return ret; }