HJOEIMLRDVQ2KZI5HGL2HKGBM3AHP7YIKGKDAGFUNKRUXVRB24NAC HPJKBFZ4C24DNBFESAVVFHOBHXOPYOP4XNPFVGOLA7EPLZCIRS6QC WFYMBNWBSHJ6GF7MPAGKT5H2CMNLNTQLBVJGVA5N6J4IWEL4ND3QC FULDVXE2XVKVC3BB2GOE2N2YZ4GW7LT26TIBXKKOVPUXGDOA5WYQC 4D7CHQ345ZSI52GPNW73JOYALRLFEGI2YUDWKG4UCDE55MO5LQ2AC 7VQ4ALFYKJBFR46T3WZDMGOXNRR3QNJEJQVBYJM4HSJUOOUD6WBQC A2GL5FOZ3UJ2NM5RPRWTNPFTKLBA54B2UC6UIYO4M3N3RFNC4BTAC SK6WHODMWAUL7LNH6QTUXZW7GQFNKKDYN4BU45ODTMMKYMG6O7KAC 24BMQDZAWDQ7VNIA7TIROXSOYLOJBNZ2E4264WHWNJAEN6ZB3UOAC NJJ7H64SZOX5EGACDCQAUQ7R6UEWD5IIC35A2MWFOOJV55DJYPHAC GS4BE6TB6GH2JUZJHDPHL6YG7J7YYESF3YOZJZ2CFABXUTO4VYPQC OCZ4LSGGSCMSLGC3C32D5JUYYHS5CIPOKOAMADEFAFZOFXJ3YY3AC 5AIYUMTBY6TFQTBRP3MJ2PYWUMRF57I77NIVWYE74UMEVQMBWZVQC ENXUSMSVOU3AZFMH2ZXR4ZVPV2LRRQYQJ6IFX33YN6IH2ORSNSAAC YZAI5GQU3HNMK5MEGF2Y7WS445AN4YKD3HNJQVQP545ODN3F5DLAC 7LB6QBXYOGU43UDLJDJQZFGT4XDALULXDF3WX3WWHL7X3JTB54CQC MB3TISH2KYBIGY6XJKMN4HO2S6TCN2GORJENMECCKLXGGIRS2O2AC PLOZBRTR6USSGJX7GR2RZKNPVYG2Q6QM7LW6IA35MKL63ZTQVD7QC K5G5GZY7D7KWVR5RAGZFHH3ZPG5OCLZT4HZ6XIJJ7YYVUMC2CTZQC PQFOMNTLFY4HINJFAQYIFTBTSDRST6W2WNCVKE5ITR2IDF4SWWXQC WKJFPR77WNFJHZ65NV3DDMCYYUUWUSI3BHTVCZZDWQI6L3FVP5AQC HHOMBU7GGRAEXODSDY3WUHQGOSQ35OTGRNBWKKAS2D4YEIZTTNUAC UQQ4IL55WHYMXNSPOXEFBTZAPMP7LQ726THOR7INRCJDSYVOP3ZAC CCHPYTCPXUHLSQWDBDVZNFTUSMRLW45DKYTVCU3XKYE3OU5RPUEQC XV4AEKJCFTNCOR52IRFYHORCNNFKIHOADIUARTC2U5Z6ZQBEEFYQC HUUZFPPKGHTXFZMZCO2UGWYNGEED3E2CFHQRFQVVBJGPQVGVY4UAC GKZN4UV75GV7GEHQGBM2O6UHN6CVSHA7BMASCUQSDIDYVUKKZL7AC HLSHCK3CP4RB6VO6N44FBHWMXBJYB6XPNWM5EKCKWU3KISU37N3AC SODOV2CMWA4JMIKRQNJ6MD3U3BS2XTSLINLRAG4SFY742IIJNI5QC NNOCZ4ROWC64ZKSAE2MPHZ3LGLI34C5TJJFW4MHXN6OELK6VMWOQC N5O7VEEOY2IE27VCOYRBG7YCY3K7JMQEDEMRT4OQ2MUE3NWULHHQC O64P4XJSK56UN73VA5JUAM4RHFDWOZJDDYKVOKMUB736ODOAUYNAC T2EIYJNGPIANHKJ4HBJIPTINWKG7RDLHR3PVHFYAPPLHZAJQBVWAC RQUAATWBGEP3YT4F555XLJYRRRGHDTEILHFORES7AM2XAOVMVJSAC LE4VZIY5VZ52FOP5QQRIJINWIMWTAPRTZTGO77JXUEPGRPRSQYMAC 63W4T5PUSRHU53CVVTVRWAQX6T74RIHH636NCGUGPN3YFVMC3VTAC IE2PRAQUCQVFPJ4CAIJRPXXEFC5VBAE3EO5I5FG4XWEDRNONNHKQC FQQRJUO4C7655SFAKPRPILALVEVRQSIIVBLMQUFPODUBXPIMWYSAC KBZHIGLGHGLST5AZZDEJTYBJSQNE2XYNHEN2FN6XMAMY5BJYZR6QC 2IQRXLWETU2RLXPYKBSZIUULDHS346BAHE2NLOMOWXC6WDAX2BYQC ATJ54SPXPE2IIFRERUOBFF42HBSEADP4QOI743ZBUNBQX3PYKRXQC 22LDPAIPRVSZVZXEDCM54GUOH72VQ52EIY47PQV2VELZ3NORC5IQC IWB3F4Z6QZYHQFJ6FWZTGWLCPBYEUTFLUS3F7QT7JOA4DV4YLYGAC #pragma once#include <pqxx/pqxx>#include "util.hh"using namespace nix;struct Connection : pqxx::connection{Connection() : pqxx::connection(getFlags()) { };string getFlags(){string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");string prefix = "dbi:Pg:";if (string(s, 0, prefix.size()) != prefix)throw Error("$HYDRA_DBI does not denote a PostgreSQL database");return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));}};struct receiver : public pqxx::notification_receiver{bool status = false;receiver(pqxx::connection_base & c, const std::string & channel): pqxx::notification_receiver(c, channel) { }void operator() (const string & payload, int pid) override{status = true;};bool get() {bool b = status;status = false;return b;}};
typedef enum {bsSuccess = 0,bsFailed = 1,bsDepFailed = 2,bsAborted = 3,bsFailedWithOutput = 6,bsTimedOut = 7,bsUnsupported = 9,} BuildStatus;typedef enum {bssSuccess = 0,bssFailed = 1,bssAborted = 4,bssTimedOut = 7,bssUnsupported = 9,bssBusy = 100, // not stored} BuildStepStatus;struct Connection : pqxx::connection{Connection() : pqxx::connection(getFlags()) { };string getFlags(){string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");string prefix = "dbi:Pg:";if (string(s, 0, prefix.size()) != prefix)throw Error("$HYDRA_DBI does not denote a PostgreSQL database");return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));}};struct receiver : public pqxx::notification_receiver{bool status = false;receiver(pqxx::connection_base & c, const std::string & channel): pqxx::notification_receiver(c, channel) { }void operator() (const string & payload, int pid) override{status = true;};bool get() {bool b = status;status = false;return b;}};typedef unsigned int BuildID;struct Step;struct Build{typedef std::shared_ptr<Build> ptr;typedef std::weak_ptr<Build> wptr;BuildID id;Path drvPath;std::map<string, Path> outputs;std::string fullJobName;unsigned int maxSilentTime, buildTimeout;std::shared_ptr<Step> toplevel;std::atomic_bool finishedInDB{false};~Build(){printMsg(lvlDebug, format("destroying build %1%") % id);}};struct Step{typedef std::shared_ptr<Step> ptr;typedef std::weak_ptr<Step> wptr;Path drvPath;Derivation drv;std::set<std::string> requiredSystemFeatures;bool preferLocalBuild;struct State{/* Whether the step has finished initialisation. */bool created = false;/* The build steps on which this step depends. */std::set<Step::ptr> deps;/* The build steps that depend on this step. */std::vector<Step::wptr> rdeps;/* Builds that have this step as the top-level derivation. */std::vector<Build::wptr> builds;
/* Point in time after which the step can be retried. */system_time after;};std::atomic_bool finished{false}; // debuggingSync<State> state;~Step(){//printMsg(lvlError, format("destroying step %1%") % drvPath);}};struct Machine{typedef std::shared_ptr<Machine> ptr;std::string sshName, sshKey;std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;unsigned int maxJobs = 1;float speedFactor = 1.0;struct State {typedef std::shared_ptr<State> ptr;counter currentJobs{0};counter nrStepsDone{0};counter totalStepTime{0}; // total time for steps, including closure copyingcounter totalStepBuildTime{0}; // total build time for steps};State::ptr state;bool supportsStep(Step::ptr step){if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;for (auto & f : mandatoryFeatures)if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()&& !(step->preferLocalBuild && f == "local"))return false;for (auto & f : step->requiredSystemFeatures)if (supportedFeatures.find(f) == supportedFeatures.end()) return false;return true;}};class State{private:Path hydraData, logDir;StringSet localPlatforms;/* The queued builds. */typedef std::map<BuildID, Build::ptr> Builds;Sync<Builds> builds;/* All active or pending build steps (i.e. dependencies of thequeued builds). Note that these are weak pointers. Steps arekept alive by being reachable from Builds or by being inprogress. */typedef std::map<Path, Step::wptr> Steps;Sync<Steps> steps;/* Build steps that have no unbuilt dependencies. */typedef std::list<Step::wptr> Runnable;Sync<Runnable> runnable;/* CV for waking up the dispatcher. */std::condition_variable dispatcherWakeup;std::mutex dispatcherMutex;/* PostgreSQL connection pool. */Pool<Connection> dbPool;/* The build machines. */typedef std::map<string, Machine::ptr> Machines;Sync<Machines> machines; // FIXME: use atomic_shared_ptrPath machinesFile;struct stat machinesFileStat;/* Token server limiting the number of threads copying closures inparallel to prevent excessive I/O load. */TokenServer copyClosureTokenServer{maxParallelCopyClosure};/* Various stats. */time_t startedAt;counter nrBuildsRead{0};counter nrBuildsDone{0};counter nrStepsDone{0};counter nrActiveSteps{0};counter nrStepsBuilding{0};counter nrStepsCopyingTo{0};counter nrStepsCopyingFrom{0};counter nrRetries{0};counter maxNrRetries{0};counter totalStepTime{0}; // total time for steps, including closure copyingcounter totalStepBuildTime{0}; // total build time for stepscounter nrQueueWakeups{0};counter nrDispatcherWakeups{0};counter bytesSent{0};counter bytesReceived{0};/* Log compressor work queue. */Sync<std::queue<Path>> logCompressorQueue;std::condition_variable_any logCompressorWakeup;/* Notification sender work queue. FIXME: if hydra-queue-runner iskilled before it has finished sending notifications about abuild, then the notifications may be lost. It would be betterto mark builds with pending notification in the database. */typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;Sync<std::queue<NotificationItem>> notificationSenderQueue;std::condition_variable_any notificationSenderWakeup;/* Specific build to do for --build-one (testing only). */BuildID buildOne;public:State();private:void clearBusy(Connection & conn, time_t stopTime);/* (Re)load /etc/nix/machines. */void loadMachinesFile();/* Thread to reload /etc/nix/machines periodically. */void monitorMachinesFile();int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",BuildID propagatedFrom = 0);void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,const std::string & machine, BuildStepStatus status, const string & errorMsg = "",BuildID propagatedFrom = 0);void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);void queueMonitor();void queueMonitorLoop();void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);void removeCancelledBuilds(Connection & conn);Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);void makeRunnable(Step::ptr step);/* The thread that selects and starts runnable builds. */void dispatcher();void wakeDispatcher();void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);/* Perform the given build step. Return true if the step is to beretried. */bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,Machine::ptr machine);void markSucceededBuild(pqxx::work & txn, Build::ptr build,const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);bool checkCachedFailure(Step::ptr step, Connection & conn);/* Thread that asynchronously bzips logs of finished steps. */void logCompressor();/* Thread that asynchronously invokes hydra-notify to send buildnotifications. */void notificationSender();/* Acquire the global queue runner lock, or null if somebody elsehas it. */std::shared_ptr<PathLocks> acquireGlobalLock();void dumpStatus(Connection & conn, bool log);public:void showStatus();void unlock();void run(BuildID buildOne = 0);};
#pragma once#include <atomic>#include <chrono>#include <condition_variable>#include <map>#include <memory>#include <queue>#include "db.hh"#include "counter.hh"#include "pathlocks.hh"#include "pool.hh"#include "sync.hh"#include "token-server.hh"#include "store-api.hh"#include "derivations.hh"using namespace nix;typedef unsigned int BuildID;typedef std::chrono::time_point<std::chrono::system_clock> system_time;typedef enum {bsSuccess = 0,bsFailed = 1,bsDepFailed = 2,bsAborted = 3,bsFailedWithOutput = 6,bsTimedOut = 7,bsUnsupported = 9,} BuildStatus;typedef enum {bssSuccess = 0,bssFailed = 1,bssAborted = 4,bssTimedOut = 7,bssUnsupported = 9,bssBusy = 100, // not stored} BuildStepStatus;struct Step;struct BuildResult;struct Build{typedef std::shared_ptr<Build> ptr;typedef std::weak_ptr<Build> wptr;BuildID id;Path drvPath;std::map<string, Path> outputs;std::string fullJobName;unsigned int maxSilentTime, buildTimeout;std::shared_ptr<Step> toplevel;std::atomic_bool finishedInDB{false};};struct Step{typedef std::shared_ptr<Step> ptr;typedef std::weak_ptr<Step> wptr;Path drvPath;Derivation drv;std::set<std::string> requiredSystemFeatures;bool preferLocalBuild;struct State{/* Whether the step has finished initialisation. */bool created = false;/* The build steps on which this step depends. */std::set<Step::ptr> deps;/* The build steps that depend on this step. */std::vector<Step::wptr> rdeps;/* Builds that have this step as the top-level derivation. */std::vector<Build::wptr> builds;/* Number of times we've tried this step. */unsigned int tries = 0;/* Point in time after which the step can be retried. */system_time after;};std::atomic_bool finished{false}; // debuggingSync<State> state;~Step(){//printMsg(lvlError, format("destroying step %1%") % drvPath);}};struct Machine{typedef std::shared_ptr<Machine> ptr;std::string sshName, sshKey;std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;unsigned int maxJobs = 1;float speedFactor = 1.0;struct State {typedef std::shared_ptr<State> ptr;counter currentJobs{0};counter nrStepsDone{0};counter totalStepTime{0}; // total time for steps, including closure copyingcounter totalStepBuildTime{0}; // total build time for steps};State::ptr state;bool supportsStep(Step::ptr step){if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;for (auto & f : mandatoryFeatures)if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()&& !(step->preferLocalBuild && f == "local"))return false;for (auto & f : step->requiredSystemFeatures)if (supportedFeatures.find(f) == supportedFeatures.end()) return false;return true;}};class State{private:Path hydraData, logDir;StringSet localPlatforms;/* The queued builds. */typedef std::map<BuildID, Build::ptr> Builds;Sync<Builds> builds;/* All active or pending build steps (i.e. dependencies of thequeued builds). Note that these are weak pointers. Steps arekept alive by being reachable from Builds or by being inprogress. */typedef std::map<Path, Step::wptr> Steps;Sync<Steps> steps;/* Build steps that have no unbuilt dependencies. */typedef std::list<Step::wptr> Runnable;Sync<Runnable> runnable;/* CV for waking up the dispatcher. */std::condition_variable dispatcherWakeup;std::mutex dispatcherMutex;/* PostgreSQL connection pool. */Pool<Connection> dbPool;/* The build machines. */typedef std::map<string, Machine::ptr> Machines;Sync<Machines> machines; // FIXME: use atomic_shared_ptrPath machinesFile;struct stat machinesFileStat;/* Token server limiting the number of threads copying closures inparallel to prevent excessive I/O load. */TokenServer copyClosureTokenServer;/* Various stats. */time_t startedAt;counter nrBuildsRead{0};counter nrBuildsDone{0};counter nrStepsDone{0};counter nrActiveSteps{0};counter nrStepsBuilding{0};counter nrStepsCopyingTo{0};counter nrStepsCopyingFrom{0};counter nrRetries{0};counter maxNrRetries{0};counter totalStepTime{0}; // total time for steps, including closure copyingcounter totalStepBuildTime{0}; // total build time for stepscounter nrQueueWakeups{0};counter nrDispatcherWakeups{0};counter bytesSent{0};counter bytesReceived{0};/* Log compressor work queue. */Sync<std::queue<Path>> logCompressorQueue;std::condition_variable_any logCompressorWakeup;/* Notification sender work queue. FIXME: if hydra-queue-runner iskilled before it has finished sending notifications about abuild, then the notifications may be lost. It would be betterto mark builds with pending notification in the database. */typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;Sync<std::queue<NotificationItem>> notificationSenderQueue;std::condition_variable_any notificationSenderWakeup;/* Specific build to do for --build-one (testing only). */BuildID buildOne;public:State();private:void clearBusy(Connection & conn, time_t stopTime);/* (Re)load /etc/nix/machines. */void loadMachinesFile();/* Thread to reload /etc/nix/machines periodically. */void monitorMachinesFile();int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",BuildID propagatedFrom = 0);void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,const std::string & machine, BuildStepStatus status, const string & errorMsg = "",BuildID propagatedFrom = 0);void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);void queueMonitor();void queueMonitorLoop();void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);void removeCancelledBuilds(Connection & conn);Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);void makeRunnable(Step::ptr step);/* The thread that selects and starts runnable builds. */void dispatcher();void wakeDispatcher();void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);/* Perform the given build step. Return true if the step is to beretried. */bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,Machine::ptr machine);void markSucceededBuild(pqxx::work & txn, Build::ptr build,const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);bool checkCachedFailure(Step::ptr step, Connection & conn);/* Thread that asynchronously bzips logs of finished steps. */void logCompressor();/* Thread that asynchronously invokes hydra-notify to send buildnotifications. */void notificationSender();/* Acquire the global queue runner lock, or null if somebody elsehas it. */std::shared_ptr<PathLocks> acquireGlobalLock();void dumpStatus(Connection & conn, bool log);public:void showStatus();void unlock();void run(BuildID buildOne = 0);};