* Sync with the trunk.

This commit is contained in:
Eelco Dolstra 2011-12-16 23:33:01 +00:00
commit 194d21f9f6
45 changed files with 928 additions and 682 deletions

View file

@ -56,7 +56,8 @@ static void tunnelStderr(const unsigned char * buf, size_t count)
if (canSendStderr && myPid == getpid()) {
try {
writeInt(STDERR_NEXT, to);
writeString(string((char *) buf, count), to);
writeString(buf, count, to);
to.flush();
} catch (...) {
/* Write failed; that means that the other side is
gone. */
@ -200,26 +201,20 @@ static void stopWork(bool success = true, const string & msg = "", unsigned int
struct TunnelSink : Sink
{
Sink & to;
TunnelSink(Sink & to) : to(to)
{
}
virtual void operator ()
(const unsigned char * data, unsigned int len)
TunnelSink(Sink & to) : to(to) { }
virtual void operator () (const unsigned char * data, size_t len)
{
writeInt(STDERR_WRITE, to);
writeString(string((const char *) data, len), to);
writeString(data, len, to);
}
};
struct TunnelSource : Source
struct TunnelSource : BufferedSource
{
Source & from;
TunnelSource(Source & from) : from(from)
{
}
virtual void operator ()
(unsigned char * data, unsigned int len)
TunnelSource(Source & from) : from(from) { }
size_t readUnbuffered(unsigned char * data, size_t len)
{
/* Careful: we're going to receive data from the client now,
so we have to disable the SIGPOLL handler. */
@ -228,11 +223,12 @@ struct TunnelSource : Source
writeInt(STDERR_READ, to);
writeInt(len, to);
string s = readString(from);
if (s.size() != len) throw Error("not enough data");
memcpy(data, (const unsigned char *) s.c_str(), len);
to.flush();
size_t n = readString(data, len, from);
startWork();
if (n == 0) throw EndOfFile("unexpected end-of-file");
return n;
}
};
@ -241,11 +237,14 @@ struct TunnelSource : Source
the contents of the file to `s'. Otherwise barf. */
struct RetrieveRegularNARSink : ParseSink
{
bool regular;
string s;
RetrieveRegularNARSink() : regular(true) { }
void createDirectory(const Path & path)
{
throw Error("regular file expected");
regular = false;
}
void receiveContents(unsigned char * data, unsigned int len)
@ -255,7 +254,7 @@ struct RetrieveRegularNARSink : ParseSink
void createSymlink(const Path & path, const string & target)
{
throw Error("regular file expected");
regular = false;
}
};
@ -266,10 +265,11 @@ struct SavingSourceAdapter : Source
Source & orig;
string s;
SavingSourceAdapter(Source & orig) : orig(orig) { }
void operator () (unsigned char * data, unsigned int len)
size_t read(unsigned char * data, size_t len)
{
orig(data, len);
s.append((const char *) data, len);
size_t n = orig.read(data, len);
s.append((const char *) data, n);
return n;
}
};
@ -327,7 +327,7 @@ static void performOp(unsigned int clientVersion,
store->queryReferrers(path, paths);
else paths = store->queryDerivationOutputs(path);
stopWork();
writeStringSet(paths, to);
writeStrings(paths, to);
break;
}
@ -371,11 +371,11 @@ static void performOp(unsigned int clientVersion,
addToStoreFromDump(). */
ParseSink sink; /* null sink; just parse the NAR */
parseDump(sink, savedNAR);
} else {
} else
parseDump(savedRegular, from);
}
startWork();
if (!savedRegular.regular) throw Error("regular file expected");
Path path = dynamic_cast<LocalStore *>(store.get())
->addToStoreFromDump(recursive ? savedNAR.s : savedRegular.s, baseName, recursive, hashAlgo);
stopWork();
@ -387,7 +387,7 @@ static void performOp(unsigned int clientVersion,
case wopAddTextToStore: {
string suffix = readString(from);
string s = readString(from);
PathSet refs = readStorePaths(from);
PathSet refs = readStorePaths<PathSet>(from);
startWork();
Path path = store->addTextToStore(suffix, s, refs);
stopWork();
@ -406,17 +406,17 @@ static void performOp(unsigned int clientVersion,
break;
}
case wopImportPath: {
case wopImportPaths: {
startWork();
TunnelSource source(from);
Path path = store->importPath(true, source);
Paths paths = store->importPaths(true, source);
stopWork();
writeString(path, to);
writeStrings(paths, to);
break;
}
case wopBuildDerivations: {
PathSet drvs = readStorePaths(from);
PathSet drvs = readStorePaths<PathSet>(from);
startWork();
store->buildDerivations(drvs);
stopWork();
@ -474,7 +474,7 @@ static void performOp(unsigned int clientVersion,
case wopCollectGarbage: {
GCOptions options;
options.action = (GCOptions::GCAction) readInt(from);
options.pathsToDelete = readStorePaths(from);
options.pathsToDelete = readStorePaths<PathSet>(from);
options.ignoreLiveness = readInt(from);
options.maxFreed = readLongLong(from);
options.maxLinks = readInt(from);
@ -492,7 +492,7 @@ static void performOp(unsigned int clientVersion,
store->collectGarbage(options, results);
stopWork();
writeStringSet(results.paths, to);
writeStrings(results.paths, to);
writeLongLong(results.bytesFreed, to);
writeLongLong(results.blocksFreed, to);
@ -530,7 +530,7 @@ static void performOp(unsigned int clientVersion,
writeInt(res ? 1 : 0, to);
if (res) {
writeString(info.deriver, to);
writeStringSet(info.references, to);
writeStrings(info.references, to);
writeLongLong(info.downloadSize, to);
if (GET_PROTOCOL_MINOR(clientVersion) >= 7)
writeLongLong(info.narSize, to);
@ -542,7 +542,7 @@ static void performOp(unsigned int clientVersion,
startWork();
PathSet paths = store->queryValidPaths();
stopWork();
writeStringSet(paths, to);
writeStrings(paths, to);
break;
}
@ -550,12 +550,12 @@ static void performOp(unsigned int clientVersion,
startWork();
PathSet paths = store->queryFailedPaths();
stopWork();
writeStringSet(paths, to);
writeStrings(paths, to);
break;
}
case wopClearFailedPaths: {
PathSet paths = readStringSet(from);
PathSet paths = readStrings<PathSet>(from);
startWork();
store->clearFailedPaths(paths);
stopWork();
@ -570,7 +570,7 @@ static void performOp(unsigned int clientVersion,
stopWork();
writeString(info.deriver, to);
writeString(printHash(info.hash), to);
writeStringSet(info.references, to);
writeStrings(info.references, to);
writeInt(info.registrationTime, to);
writeLongLong(info.narSize, to);
break;
@ -603,8 +603,8 @@ static void processConnection()
unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
writeInt(WORKER_MAGIC_2, to);
writeInt(PROTOCOL_VERSION, to);
to.flush();
unsigned int clientVersion = readInt(from);
/* Send startup error messages to the client. */
@ -626,9 +626,11 @@ static void processConnection()
store = boost::shared_ptr<StoreAPI>(new LocalStore());
stopWork();
to.flush();
} catch (Error & e) {
stopWork(false, e.msg());
to.flush();
return;
}
@ -648,9 +650,19 @@ static void processConnection()
try {
performOp(clientVersion, from, to, op);
} catch (Error & e) {
/* If we're not in a state were we can send replies, then
something went wrong processing the input of the
client. This can happen especially if I/O errors occur
during addTextToStore() / importPath(). If that
happens, just send the error message and exit. */
bool errorAllowed = canSendStderr;
if (!errorAllowed) printMsg(lvlError, format("error processing client input: %1%") % e.msg());
stopWork(false, e.msg(), GET_PROTOCOL_MINOR(clientVersion) >= 8 ? e.status : 0);
if (!errorAllowed) break;
}
to.flush();
assert(!canSendStderr);
};