This removes the need for Nix's build-remote.pl.
Build logs are now written to $HYDRA_DATA/build-logs because hydra-queue-runner doesn't have write permission to /nix/var/log.
5AIYUMTBY6TFQTBRP3MJ2PYWUMRF57I77NIVWYE74UMEVQMBWZVQC O776XDS2CAQ6CMDEWAXU5ZGBH4ETZB77CODANQM233BARYWBRIXQC YZAI5GQU3HNMK5MEGF2Y7WS445AN4YKD3HNJQVQP545ODN3F5DLAC 24BMQDZAWDQ7VNIA7TIROXSOYLOJBNZ2E4264WHWNJAEN6ZB3UOAC ENXUSMSVOU3AZFMH2ZXR4ZVPV2LRRQYQJ6IFX33YN6IH2ORSNSAAC NJJ7H64SZOX5EGACDCQAUQ7R6UEWD5IIC35A2MWFOOJV55DJYPHAC 62MQPRXCZCP7ZQKOOAFU5V36P7DBS6RCFDYK53PDCFGI4FAOQWMQC T2EIYJNGPIANHKJ4HBJIPTINWKG7RDLHR3PVHFYAPPLHZAJQBVWAC 2GK5DOU7ODF4WBSN3QTD3WIO52VTL2LOAXKGCDEMMAQPTEO4A4HAC #include <algorithm>#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include "build-remote.hh"#include "util.hh"#include "misc.hh"#include "serve-protocol.hh"#include "worker-protocol.hh"using namespace nix;struct Child{Pid pid;AutoCloseFD to, from;};static void openConnection(const string & sshName, const string & sshKey,int stderrFD, Child & child){Pipe to, from;to.create();from.create();child.pid = startProcess([&]() {if (dup2(to.readSide, STDIN_FILENO) == -1)throw SysError("cannot dup input pipe to stdin");if (dup2(from.writeSide, STDOUT_FILENO) == -1)throw SysError("cannot dup output pipe to stdout");if (dup2(stderrFD, STDERR_FILENO) == -1)throw SysError("cannot dup stderr");Strings argv({"ssh", "-x", "-a", sshName, "--", "nix-store", "--serve", "--write"});execvp("ssh", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove castthrow SysError("cannot start ssh");});to.readSide.close();from.writeSide.close();child.to = to.writeSide.borrow();child.from = from.readSide.borrow();}static void copyClosureTo(std::shared_ptr<StoreAPI> store,FdSource & from, FdSink & to, const PathSet & paths,bool useSubstitutes = false){PathSet closure;for (auto & path : paths)computeFSClosure(*store, path, closure);Paths sorted = topoSortPaths(*store, closure);/* Send the "query valid paths" command with the "lock" optionenabled. This prevents a race where the remote hostgarbage-collect paths that are already there. Optionally, askthe remote host to substitute missing paths. */writeInt(cmdQueryValidPaths, to);writeInt(1, to); // == lock pathswriteInt(useSubstitutes, to);writeStrings(sorted, to);to.flush();/* Get back the set of paths that are already valid on the remotehost. */auto present = readStorePaths<PathSet>(from);PathSet missing;std::set_difference(closure.begin(), closure.end(), present.begin(), present.end(),std::inserter(missing, missing.end()));printMsg(lvlError, format("sending %1% missing paths") % missing.size());if (missing.empty()) return;throw Error("NOT IMPL 1");}static void copyClosureFrom(std::shared_ptr<StoreAPI> store,FdSource & from, FdSink & to, const PathSet & paths){writeInt(cmdExportPaths, to);writeInt(0, to); // == don't signwriteStrings(paths, to);to.flush();store->importPaths(false, from);}void buildRemote(std::shared_ptr<StoreAPI> store,const string & sshName, const string & sshKey,const Path & drvPath, const Derivation & drv,const nix::Path & logDir, RemoteResult & result){string base = baseNameOf(drvPath);Path logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);createDirs(dirOf(logFile));AutoCloseFD logFD(open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666));if (logFD == -1) throw SysError(format("creating log file ‘%1%’") % logFile);Child child;openConnection(sshName, sshKey, logFD, child);logFD.close();FdSource from(child.from);FdSink to(child.to);/* Handshake. */writeInt(SERVE_MAGIC_1, to);writeInt(SERVE_PROTOCOL_VERSION, to);to.flush();unsigned int magic = readInt(from);if (magic != SERVE_MAGIC_2)throw Error(format("protocol mismatch with ‘nix-store --serve’ on ‘%1%’") % sshName);unsigned int version = readInt(from);if (GET_PROTOCOL_MAJOR(version) != 0x200)throw Error(format("unsupported ‘nix-store --serve’ protocol version on ‘%1%’") % sshName);/* Copy the input closure. */printMsg(lvlError, format("sending closure of ‘%1%’ to ‘%2%’") % drvPath % sshName);copyClosureTo(store, from, to, PathSet({drvPath}));/* Do the build. */printMsg(lvlError, format("building ‘%1%’ on ‘%2%’") % drvPath % sshName);writeInt(cmdBuildPaths, to);writeStrings(PathSet({drvPath}), to);writeInt(3600, to); // == maxSilentTime, FIXMEwriteInt(7200, to); // == buildTimeout, FIXMEto.flush();result.startTime = time(0);int res = readInt(from);result.stopTime = time(0);if (res) {result.errorMsg = (format("%1% on ‘%2%’") % readString(from) % sshName).str();if (res == 100) result.status = RemoteResult::rrPermanentFailure;else if (res == 101) result.status = RemoteResult::rrTimedOut;else result.status = RemoteResult::rrMiscFailure;return;}/* Copy the output paths. */printMsg(lvlError, format("copying outputs of ‘%1%’ from ‘%2%’") % drvPath % sshName);PathSet outputs;for (auto & output : drv.outputs)outputs.insert(output.second.path);copyClosureFrom(store, from, to, outputs);/* Shut down the connection. */child.to.close();child.pid.wait(true);result.status = RemoteResult::rrSuccess;}
#pragma once#include "store-api.hh"#include "derivations.hh"struct RemoteResult{enum {rrSuccess = 0,rrPermanentFailure = 1,rrTimedOut = 2,rrMiscFailure = 3} status = rrMiscFailure;std::string errorMsg;time_t startTime = 0, stopTime = 0;};void buildRemote(std::shared_ptr<nix::StoreAPI> store,const std::string & sshName, const std::string & sshKey,const nix::Path & drvPath, const nix::Derivation & drv,const nix::Path & logDir, RemoteResult & result);
}};struct Machine{typedef std::shared_ptr<Machine> ptr;std::string sshName, sshKey;std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;unsigned int maxJobs = 1;float speedFactor = 1.0;Sync<unsigned int> currentJobs;Machine(){auto currentJobs_(currentJobs.lock());*currentJobs_ = 0;
/* A RAII helper that manages the currentJobs field of Machineobjects. */struct MachineReservation{typedef std::shared_ptr<MachineReservation> ptr;Machine::ptr machine;MachineReservation(Machine::ptr machine) : machine(machine){auto currentJobs_(machine->currentJobs.lock());(*currentJobs_)++;}~MachineReservation(){auto currentJobs_(machine->currentJobs.lock());if (*currentJobs_ > 0) (*currentJobs_)--;}};
Machines newMachines;if (pathExists(machinesFile)) {for (auto line : tokenizeString<Strings>(readFile(machinesFile), "\n")) {line = trim(string(line, 0, line.find('#')));auto tokens = tokenizeString<std::vector<std::string>>(line);if (tokens.size() < 3) continue;tokens.resize(7);auto machine = std::make_shared<Machine>();machine->sshName = tokens[0];machine->systemTypes = tokenizeString<StringSet>(tokens[1], ",");machine->sshKey = tokens[2];if (tokens[3] != "")string2Int(tokens[3], machine->maxJobs);elsemachine->maxJobs = 1;machine->speedFactor = atof(tokens[4].c_str());machine->supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");machine->mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");newMachines.push_back(machine);}
} else {auto machine = std::make_shared<Machine>();machine->sshName = "localhost";machine->systemTypes = StringSet({settings.thisSystem});if (settings.thisSystem == "x86_64-linux")machine->systemTypes.insert("i686-linux");machine->maxJobs = settings.maxBuildJobs;newMachines.push_back(machine);}auto machines_(machines.lock());*machines_ = newMachines;}
("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, stopTime = $6 where build = $2 and stepnr = $3")
("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, startTime = $6, stopTime = $7 where build = $2 and stepnr = $3")
while (runnable_->empty() && !exitRequested)runnable_.wait(runnableWakeup);if (exitRequested) break;auto weak = *runnable_->begin();runnable_->pop_front();step = weak.lock();if (!step) continue;
printMsg(lvlError, format("%1% runnable builds") % runnable_->size());/* FIXME: we're holding the runnable lock too longhere. This could be more efficient. */for (auto i = runnable_->begin(); i != runnable_->end(); ) {auto step = i->lock();/* Delete dead steps. */if (!step) {i = runnable_->erase(i);continue;}auto reservation = findMachine(step);if (!reservation) {printMsg(lvlError, format("cannot execute step ‘%1%’ right now") % step->drvPath);++i;continue;}printMsg(lvlInfo, format("WOOHOO: starting step ‘%1%’ on machine ‘%2%’")% step->drvPath % reservation->machine->sshName);i = runnable_->erase(i);auto builderThread = std::thread(&State::builder, this, step, reservation);builderThread.detach(); // FIXME?}
/* Build it. */printMsg(lvlError, format("slot %1%: got build step ‘%2%’") % slot % step->drvPath);doBuildStep(store, step);
/* Sleep until we're woken up (either because a runnable buildis added, or because a build finishes). */{std::unique_lock<std::mutex> lock(dispatcherMutex);dispatcherWakeup.wait(lock);}
void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
MachineReservation::ptr State::findMachine(Step::ptr step){auto machines_(machines.lock());for (auto & machine : *machines_) {if (!has(machine->systemTypes, step->drv.platform)) continue;// FIXME: check features{auto currentJobs_(machine->currentJobs.lock());if (*currentJobs_ >= machine->maxJobs) continue;}return std::make_shared<MachineReservation>(machine);}/* FIXME: distinguish between permanent failures (a matchingmachine doesn't exist) and temporary failures (a matchingmachine is not available). */return 0;}void State::builder(Step::ptr step, MachineReservation::ptr reservation){try {auto store = openStore(); // FIXME: pooldoBuildStep(store, step, reservation->machine);} catch (std::exception & e) {printMsg(lvlError, format("build thread for ‘%1%’: %2%") % step->drvPath % e.what());// FIXME: put step back in runnable and retry}/* Release the machine and wake up the dispatcher. */assert(reservation.unique());reservation = 0;wakeDispatcher();printMsg(lvlError, "builder exits");}void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,Machine::ptr machine)
std::vector<std::thread> builderThreads;for (int n = 0; n < 4; n++)builderThreads.push_back(std::thread(&State::builderThreadEntry, this, n));
auto queueMonitorThread = std::thread(&State::queueMonitor, this);auto dispatcherThread = std::thread(&State::dispatcher, this);
for ($fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {return $_ if (-f $_);
my $fn2 = Hydra::Model::DB::getHydraPath . "/build-logs/";for ($fn2 . $bucketed, $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {return $_ if -f $_;