00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include <BufStream.H>
00028 #include "SRPC.H"
00029 #include "MultiSRPC.H"
00030 #include "VestaSource.H"
00031 #include "VestaSourceAtomic.H"
00032 #include "VestaSourceImpl.H"
00033 #include "VestaSourceSRPC.H"
00034 #include "VRConcurrency.H"
00035 #include "logging.H"
00036 #include "Mastership.H"
00037
00038 #include "lock_timing.H"
00039
00040 using Basics::OBufStream;
00041
00042 class VSAStep;
00043
00044 struct VSAProgramState {
00045 int pc;
00046 VSAStep* first;
00047 VSAStep* last;
00048 VestaSource::errorCode target1, target2;
00049 VestaSource::errorCode okreplace;
00050 Sequence<VestaSource*> vss;
00051 ReadersWritersLock* lock;
00052 AccessControl::Identity who;
00053 time_t now;
00054 SRPC* srpc;
00055 int intf_ver;
00056 };
00057
00058 class VSAStep {
00059 public:
00060 VSAStep* next;
00061
00062 virtual VestaSource::errorCode
00063 execute(VSAProgramState* state) = 0;
00064
00065 virtual ~VSAStep() { }
00066 };
00067
00068 class SetTargetStep : public VSAStep {
00069 VestaSource::errorCode target1;
00070 VestaSource::errorCode target2;
00071 VestaSource::errorCode okreplace;
00072 public:
00073 SetTargetStep(VSAProgramState* state) {
00074 next = NULL;
00075 target1 = (VestaSource::errorCode) state->srpc->recv_int();
00076 target2 = (VestaSource::errorCode) state->srpc->recv_int();
00077 okreplace = (VestaSource::errorCode) state->srpc->recv_int();
00078 };
00079 VestaSource::errorCode execute(VSAProgramState* state) {
00080 state->target1 = target1;
00081 state->target2 = target2;
00082 state->okreplace = okreplace;
00083 return target1;
00084 };
00085 };
00086
00087 class DeclStep : public VSAStep {
00088 LongId longid;
00089 public:
00090 DeclStep(VSAProgramState* state) {
00091 next = NULL;
00092 int len = sizeof(longid.value);
00093 state->srpc->recv_bytes_here((char *) &longid.value, len);
00094 };
00095 VestaSource::errorCode execute(VSAProgramState* state) {
00096 VestaSource* vs = longid.lookup(LongId::checkLock, &state->lock);
00097 state->vss.addhi(vs);
00098 if (vs == NULL) {
00099 return VestaSource::notFound;
00100 } else {
00101 return VestaSource::ok;
00102 }
00103 };
00104 };
00105
00106 class ResyncStep : public VSAStep {
00107 VestaSourceAtomic::VSIndex vsi;
00108 public:
00109 ResyncStep(VSAProgramState* state) {
00110 next = NULL;
00111 vsi = state->srpc->recv_int();
00112 };
00113 VestaSource::errorCode execute(VSAProgramState* state) {
00114 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00115 VestaSource* vs = state->vss.get(vsi);
00116 if (vs == NULL) return VestaSource::invalidArgs;
00117 vs->resync();
00118 return VestaSource::ok;
00119 };
00120 };
00121
00122 class SetTimestampStep : public VSAStep {
00123 VestaSourceAtomic::VSIndex vsi;
00124 time_t ts;
00125 public:
00126 SetTimestampStep(VSAProgramState* state) {
00127 next = NULL;
00128 vsi = state->srpc->recv_int();
00129 ts = (time_t) state->srpc->recv_int();
00130 };
00131 VestaSource::errorCode execute(VSAProgramState* state) {
00132 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00133 VestaSource* vs = state->vss.get(vsi);
00134 if (vs == NULL) return VestaSource::invalidArgs;
00135 return vs->setTimestamp(ts, state->who);
00136 };
00137 };
00138
00139 class LookupStep : public VSAStep {
00140 VestaSourceAtomic::VSIndex vsi;
00141 char arc[MAX_ARC_LEN+1];
00142 public:
00143 LookupStep(VSAProgramState* state) {
00144 next = NULL;
00145 vsi = state->srpc->recv_int();
00146 int len = MAX_ARC_LEN+1;
00147 state->srpc->recv_chars_here(arc, len);
00148 };
00149 VestaSource::errorCode execute(VSAProgramState* state) {
00150 if (vsi < 0 || vsi >= state->vss.size()) {
00151 state->vss.addhi(NULL);
00152 return VestaSource::invalidArgs;
00153 }
00154 VestaSource* vs = state->vss.get(vsi);
00155 if (vs == NULL) return VestaSource::invalidArgs;
00156 VestaSource* result;
00157 VestaSource::errorCode ret = vs->lookup(arc, result, state->who);
00158 state->vss.addhi(result);
00159 return ret;
00160 };
00161 };
00162
00163 class LookupPathnameStep : public VSAStep {
00164 VestaSourceAtomic::VSIndex vsi;
00165 char* pathname;
00166 char pathnameSep;
00167 public:
00168 LookupPathnameStep(VSAProgramState* state) {
00169 next = NULL;
00170 vsi = state->srpc->recv_int();
00171 pathname = state->srpc->recv_chars();
00172 pathnameSep = (char) state->srpc->recv_int();
00173 };
00174 VestaSource::errorCode execute(VSAProgramState* state) {
00175 if (vsi < 0 || vsi >= state->vss.size()) {
00176 state->vss.addhi(NULL);
00177 return VestaSource::invalidArgs;
00178 }
00179 VestaSource* vs = state->vss.get(vsi);
00180 if (vs == NULL) return VestaSource::invalidArgs;
00181 VestaSource* result;
00182 VestaSource::errorCode ret =
00183 vs->lookupPathname(pathname, result, state->who);
00184 state->vss.addhi(result);
00185 return ret;
00186 };
00187 ~LookupPathnameStep() {
00188 delete[] pathname;
00189 };
00190 };
00191
00192 class LookupIndexStep : public VSAStep {
00193 VestaSourceAtomic::VSIndex vsi;
00194 unsigned int index;
00195 public:
00196 LookupIndexStep(VSAProgramState* state) {
00197 next = NULL;
00198 vsi = state->srpc->recv_int();
00199 index = state->srpc->recv_int();
00200 };
00201 VestaSource::errorCode execute(VSAProgramState* state) {
00202 if (vsi < 0 || vsi >= state->vss.size()) {
00203 state->vss.addhi(NULL);
00204 return VestaSource::invalidArgs;
00205 }
00206 VestaSource* vs = state->vss.get(vsi);
00207 if (vs == NULL) return VestaSource::invalidArgs;
00208 VestaSource* result;
00209 VestaSource::errorCode ret =
00210 vs->lookupIndex(index, result);
00211 state->vss.addhi(result);
00212 return ret;
00213 };
00214 };
00215
00216 class ReallyDeleteStep : public VSAStep {
00217 VestaSourceAtomic::VSIndex vsi;
00218 char arc[MAX_ARC_LEN+1];
00219 bool existCheck;
00220 time_t timestamp;
00221 public:
00222 ReallyDeleteStep(VSAProgramState* state) {
00223 next = NULL;
00224 vsi = state->srpc->recv_int();
00225 int len = MAX_ARC_LEN+1;
00226 state->srpc->recv_chars_here(arc, len);
00227 existCheck = (bool) state->srpc->recv_int();
00228 timestamp = (time_t) state->srpc->recv_int();
00229 };
00230 VestaSource::errorCode execute(VSAProgramState* state) {
00231 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00232 VestaSource* vs = state->vss.get(vsi);
00233 if (vs == NULL) return VestaSource::invalidArgs;
00234 VestaSource::errorCode ret =
00235 vs->reallyDelete(arc, state->who, existCheck,
00236 timestamp ? timestamp : state->now);
00237 return ret;
00238 };
00239 };
00240
00241 class InsertFileStep : public VSAStep {
00242 VestaSourceAtomic::VSIndex vsi;
00243 char arc[MAX_ARC_LEN+1];
00244 ShortId sid;
00245 bool master;
00246 VestaSource::dupeCheck chk;
00247 time_t timestamp;
00248 FP::Tag fptag;
00249 bool fptagPresent;
00250 public:
00251 InsertFileStep(VSAProgramState* state) {
00252 next = NULL;
00253 vsi = state->srpc->recv_int();
00254 int len = MAX_ARC_LEN+1;
00255 state->srpc->recv_chars_here(arc, len);
00256 sid = (ShortId) state->srpc->recv_int();
00257 master = (bool) state->srpc->recv_int();
00258 chk = (VestaSource::dupeCheck) state->srpc->recv_int();
00259 timestamp = (time_t) state->srpc->recv_int();
00260 len = FP::ByteCnt;
00261 unsigned char fpbytes[FP::ByteCnt];
00262 state->srpc->recv_bytes_here((char*)fpbytes, len);
00263 fptagPresent = (len != 0);
00264 if(fptagPresent)
00265 {
00266 fptag.FromBytes(fpbytes);
00267 }
00268 };
00269 VestaSource::errorCode execute(VSAProgramState* state) {
00270 if (vsi < 0 || vsi >= state->vss.size()) {
00271 state->vss.addhi(NULL);
00272 return VestaSource::invalidArgs;
00273 }
00274 VestaSource* vs = state->vss.get(vsi);
00275 if (vs == NULL) return VestaSource::invalidArgs;
00276 VestaSource* result;
00277 VestaSource::errorCode ret =
00278 vs->insertFile(arc, sid, master, state->who,
00279 chk, &result, timestamp ? timestamp : state->now,
00280 fptagPresent ? &fptag : NULL);
00281 state->vss.addhi(result);
00282 return ret;
00283 };
00284 };
00285
00286 class InsertMutableFileStep : public VSAStep {
00287 VestaSourceAtomic::VSIndex vsi;
00288 char arc[MAX_ARC_LEN+1];
00289 ShortId sid;
00290 bool master;
00291 VestaSource::dupeCheck chk;
00292 time_t timestamp;
00293 public:
00294 InsertMutableFileStep(VSAProgramState* state) {
00295 next = NULL;
00296 vsi = state->srpc->recv_int();
00297 int len = MAX_ARC_LEN+1;
00298 state->srpc->recv_chars_here(arc, len);
00299 sid = (ShortId) state->srpc->recv_int();
00300 master = (bool) state->srpc->recv_int();
00301 chk = (VestaSource::dupeCheck) state->srpc->recv_int();
00302 timestamp = (time_t) state->srpc->recv_int();
00303 };
00304 VestaSource::errorCode execute(VSAProgramState* state) {
00305 if (vsi < 0 || vsi >= state->vss.size()) {
00306 state->vss.addhi(NULL);
00307 return VestaSource::invalidArgs;
00308 }
00309 VestaSource* vs = state->vss.get(vsi);
00310 if (vs == NULL) return VestaSource::invalidArgs;
00311 VestaSource* result;
00312 VestaSource::errorCode ret =
00313 vs->insertMutableFile(arc, sid, master, state->who, chk,
00314 &result, timestamp ? timestamp : state->now);
00315 state->vss.addhi(result);
00316 return ret;
00317 };
00318 };
00319
00320 class InsertImmutableDirectoryStep : public VSAStep {
00321 VestaSourceAtomic::VSIndex parenti;
00322 char arc[MAX_ARC_LEN+1];
00323 VestaSourceAtomic::VSIndex diri;
00324 bool master;
00325 VestaSource::dupeCheck chk;
00326 time_t timestamp;
00327 FP::Tag fptag;
00328 bool fptagPresent;
00329 public:
00330 InsertImmutableDirectoryStep(VSAProgramState* state) {
00331 next = NULL;
00332 parenti = state->srpc->recv_int();
00333 int len = MAX_ARC_LEN+1;
00334 state->srpc->recv_chars_here(arc, len);
00335 diri = state->srpc->recv_int();
00336 master = (bool) state->srpc->recv_int();
00337 chk = (VestaSource::dupeCheck) state->srpc->recv_int();
00338 timestamp = (time_t) state->srpc->recv_int();
00339 len = FP::ByteCnt;
00340 unsigned char fpbytes[FP::ByteCnt];
00341 state->srpc->recv_bytes_here((char*)fpbytes, len);
00342 fptagPresent = (len != 0);
00343 if(fptagPresent)
00344 {
00345 fptag.FromBytes(fpbytes);
00346 }
00347 };
00348 VestaSource::errorCode execute(VSAProgramState* state) {
00349 if (parenti < 0 || parenti >= state->vss.size() ||
00350 diri < -1 || diri >= state->vss.size()) {
00351 state->vss.addhi(NULL);
00352 return VestaSource::invalidArgs;
00353 }
00354 VestaSource* parent = state->vss.get(parenti);
00355 if (parent == NULL) return VestaSource::invalidArgs;
00356 VestaSource* dir;
00357 if (diri == -1) {
00358 dir = NULL;
00359 } else {
00360 dir = state->vss.get(diri);
00361 }
00362 VestaSource* result;
00363 VestaSource::errorCode ret =
00364 parent->insertImmutableDirectory(arc, dir, master, state->who,
00365 chk, &result,
00366 timestamp ? timestamp : state->now,
00367 fptagPresent ? &fptag : NULL);
00368 state->vss.addhi(result);
00369 return ret;
00370 };
00371 };
00372
00373 class InsertAppendableDirectoryStep : public VSAStep {
00374 VestaSourceAtomic::VSIndex vsi;
00375 char arc[MAX_ARC_LEN+1];
00376 bool master;
00377 VestaSource::dupeCheck chk;
00378 time_t timestamp;
00379 public:
00380 InsertAppendableDirectoryStep(VSAProgramState* state) {
00381 next = NULL;
00382 vsi = state->srpc->recv_int();
00383 int len = MAX_ARC_LEN+1;
00384 state->srpc->recv_chars_here(arc, len);
00385 master = (bool) state->srpc->recv_int();
00386 chk = (VestaSource::dupeCheck) state->srpc->recv_int();
00387 timestamp = (time_t) state->srpc->recv_int();
00388 };
00389 VestaSource::errorCode execute(VSAProgramState* state) {
00390 if (vsi < 0 || vsi >= state->vss.size()) {
00391 state->vss.addhi(NULL);
00392 return VestaSource::invalidArgs;
00393 }
00394 VestaSource* vs = state->vss.get(vsi);
00395 if (vs == NULL) return VestaSource::invalidArgs;
00396 VestaSource* result;
00397 VestaSource::errorCode ret =
00398 vs->insertAppendableDirectory(arc, master, state->who,
00399 chk, &result,
00400 timestamp ? timestamp : state->now);
00401 if((ret == VestaSource::ok) && master && !vs->master &&
00402 !myMasterHint.Empty())
00403 {
00404
00405
00406
00407
00408 result->setAttrib("master-repository", myMasterHint.cchars(), NULL);
00409 }
00410 state->vss.addhi(result);
00411 return ret;
00412 };
00413 };
00414
00415 class InsertMutableDirectoryStep : public VSAStep {
00416 VestaSourceAtomic::VSIndex parenti;
00417 char arc[MAX_ARC_LEN+1];
00418 VestaSourceAtomic::VSIndex diri;
00419 bool master;
00420 VestaSource::dupeCheck chk;
00421 time_t timestamp;
00422 public:
00423 InsertMutableDirectoryStep(VSAProgramState* state) {
00424 next = NULL;
00425 parenti = state->srpc->recv_int();
00426 int len = MAX_ARC_LEN+1;
00427 state->srpc->recv_chars_here(arc, len);
00428 diri = state->srpc->recv_int();
00429 master = (bool) state->srpc->recv_int();
00430 chk = (VestaSource::dupeCheck) state->srpc->recv_int();
00431 timestamp = (time_t) state->srpc->recv_int();
00432 };
00433 VestaSource::errorCode execute(VSAProgramState* state) {
00434 if (parenti < 0 || parenti >= state->vss.size() ||
00435 diri < -1 || diri >= state->vss.size()) {
00436 state->vss.addhi(NULL);
00437 return VestaSource::invalidArgs;
00438 }
00439 VestaSource* parent = state->vss.get(parenti);
00440 if (parent == NULL) return VestaSource::invalidArgs;
00441 VestaSource* dir;
00442 if (diri == -1) {
00443 dir = NULL;
00444 } else {
00445 dir = state->vss.get(diri);
00446 }
00447 VestaSource* result;
00448 VestaSource::errorCode ret =
00449 parent->insertMutableDirectory(arc, dir, master, state->who,
00450 chk, &result,
00451 timestamp ? timestamp : state->now);
00452 state->vss.addhi(result);
00453 return ret;
00454 };
00455 };
00456
00457 class InsertGhostStep : public VSAStep {
00458 VestaSourceAtomic::VSIndex vsi;
00459 char arc[MAX_ARC_LEN+1];
00460 bool master;
00461 VestaSource::dupeCheck chk;
00462 time_t timestamp;
00463 public:
00464 InsertGhostStep(VSAProgramState* state) {
00465 next = NULL;
00466 vsi = state->srpc->recv_int();
00467 int len = MAX_ARC_LEN+1;
00468 state->srpc->recv_chars_here(arc, len);
00469 master = (bool) state->srpc->recv_int();
00470 chk = (VestaSource::dupeCheck) state->srpc->recv_int();
00471 timestamp = (time_t) state->srpc->recv_int();
00472 };
00473 VestaSource::errorCode execute(VSAProgramState* state) {
00474 if (vsi < 0 || vsi >= state->vss.size()) {
00475 state->vss.addhi(NULL);
00476 return VestaSource::invalidArgs;
00477 }
00478 VestaSource* vs = state->vss.get(vsi);
00479 if (vs == NULL) return VestaSource::invalidArgs;
00480 VestaSource* result;
00481 VestaSource::errorCode ret =
00482 vs->insertGhost(arc, master, state->who, chk, &result,
00483 timestamp ? timestamp : state->now);
00484 state->vss.addhi(result);
00485 return ret;
00486 };
00487 };
00488
00489 class InsertStubStep : public VSAStep {
00490 VestaSourceAtomic::VSIndex vsi;
00491 char arc[MAX_ARC_LEN+1];
00492 bool master;
00493 VestaSource::dupeCheck chk;
00494 time_t timestamp;
00495 public:
00496 InsertStubStep(VSAProgramState* state) {
00497 next = NULL;
00498 vsi = state->srpc->recv_int();
00499 int len = MAX_ARC_LEN+1;
00500 state->srpc->recv_chars_here(arc, len);
00501 master = (bool) state->srpc->recv_int();
00502 chk = (VestaSource::dupeCheck) state->srpc->recv_int();
00503 timestamp = (time_t) state->srpc->recv_int();
00504 };
00505 VestaSource::errorCode execute(VSAProgramState* state) {
00506 if (vsi < 0 || vsi >= state->vss.size()) {
00507 state->vss.addhi(NULL);
00508 return VestaSource::invalidArgs;
00509 }
00510 VestaSource* vs = state->vss.get(vsi);
00511 if (vs == NULL) return VestaSource::invalidArgs;
00512 VestaSource* result;
00513 VestaSource::errorCode ret =
00514 vs->insertStub(arc, master, state->who, chk, &result,
00515 timestamp ? timestamp : state->now);
00516 state->vss.addhi(result);
00517 return ret;
00518 };
00519 };
00520
00521 class RenameToStep : public VSAStep {
00522 VestaSourceAtomic::VSIndex vsi;
00523 char arc[MAX_ARC_LEN+1];
00524 VestaSourceAtomic::VSIndex fromDirI;
00525 char fromArc[MAX_ARC_LEN+1];
00526 VestaSource::dupeCheck chk;
00527 time_t timestamp;
00528 public:
00529 RenameToStep(VSAProgramState* state) {
00530 next = NULL;
00531 vsi = state->srpc->recv_int();
00532 int len = MAX_ARC_LEN+1;
00533 state->srpc->recv_chars_here(arc, len);
00534 fromDirI = state->srpc->recv_int();
00535 len = MAX_ARC_LEN+1;
00536 state->srpc->recv_chars_here(fromArc, len);
00537 chk = (VestaSource::dupeCheck) state->srpc->recv_int();
00538 timestamp = (time_t) state->srpc->recv_int();
00539 };
00540 VestaSource::errorCode execute(VSAProgramState* state) {
00541 if (vsi < 0 || vsi >= state->vss.size() ||
00542 fromDirI < 0 || fromDirI >= state->vss.size()) {
00543 return VestaSource::invalidArgs;
00544 }
00545 VestaSource* vs = state->vss.get(vsi);
00546 if (vs == NULL) return VestaSource::invalidArgs;
00547 VestaSource* fromDir = state->vss.get(fromDirI);
00548 if (fromDir == NULL) return VestaSource::invalidArgs;
00549 VestaSource::errorCode ret =
00550 vs->renameTo(arc, fromDir, fromArc, state->who, chk,
00551 timestamp ? timestamp : state->now);
00552 return ret;
00553 };
00554 };
00555
00556 class MakeFilesImmutableStep : public VSAStep {
00557 VestaSourceAtomic::VSIndex vsi;
00558 unsigned int threshold;
00559 public:
00560 MakeFilesImmutableStep(VSAProgramState* state) {
00561 next = NULL;
00562 vsi = state->srpc->recv_int();
00563 threshold = (unsigned int) state->srpc->recv_int();
00564 };
00565 VestaSource::errorCode execute(VSAProgramState* state) {
00566 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00567 VestaSource* vs = state->vss.get(vsi);
00568 if (vs == NULL) return VestaSource::invalidArgs;
00569 return vs->makeFilesImmutable(threshold, state->who);
00570 };
00571 };
00572
00573 class TestMasterStep : public VSAStep {
00574 VestaSourceAtomic::VSIndex vsi;
00575 bool master;
00576 VestaSource::errorCode err;
00577 public:
00578 TestMasterStep(VSAProgramState* state) {
00579 next = NULL;
00580 vsi = state->srpc->recv_int();
00581 master = (bool) state->srpc->recv_int();
00582 err = (VestaSource::errorCode) state->srpc->recv_int();
00583 };
00584 VestaSource::errorCode execute(VSAProgramState* state) {
00585 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00586 VestaSource* vs = state->vss.get(vsi);
00587 if (vs == NULL) return VestaSource::invalidArgs;
00588 if (vs->master == master) {
00589 return VestaSource::ok;
00590 } else {
00591 return err;
00592 }
00593 };
00594 };
00595
00596 class SetMasterStep : public VSAStep {
00597 VestaSourceAtomic::VSIndex vsi;
00598 bool master;
00599 public:
00600 SetMasterStep(VSAProgramState* state) {
00601 next = NULL;
00602 vsi = state->srpc->recv_int();
00603 master = (bool) state->srpc->recv_int();
00604 };
00605 VestaSource::errorCode execute(VSAProgramState* state) {
00606 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00607 VestaSource* vs = state->vss.get(vsi);
00608 if (vs == NULL) return VestaSource::invalidArgs;
00609 return vs->setMaster(master, state->who);
00610 };
00611 };
00612
00613 class InAttribsStep : public VSAStep {
00614 VestaSourceAtomic::VSIndex vsi;
00615 char* name;
00616 char* value;
00617 bool expected;
00618 VestaSource::errorCode err;
00619 public:
00620 InAttribsStep(VSAProgramState* state) {
00621 next = NULL;
00622 vsi = state->srpc->recv_int();
00623 name = state->srpc->recv_chars();
00624 value = state->srpc->recv_chars();
00625 expected = (bool) state->srpc->recv_int();
00626 err = (VestaSource::errorCode) state->srpc->recv_int();
00627 };
00628 VestaSource::errorCode execute(VSAProgramState* state) {
00629 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00630 VestaSource* vs = state->vss.get(vsi);
00631 if (vs == NULL) return VestaSource::invalidArgs;
00632 if (vs->inAttribs(name, value) == expected) {
00633 return VestaSource::ok;
00634 } else {
00635 return err;
00636 };
00637 };
00638 ~InAttribsStep() {
00639 delete[] name;
00640 delete[] value;
00641 };
00642 };
00643
00644 class WriteAttribStep : public VSAStep {
00645 VestaSourceAtomic::VSIndex vsi;
00646 VestaSource::attribOp op;
00647 char* name;
00648 char* value;
00649 time_t timestamp;
00650 public:
00651 WriteAttribStep(VSAProgramState* state) {
00652 next = NULL;
00653 vsi = state->srpc->recv_int();
00654 op = (VestaSource::attribOp) state->srpc->recv_int();
00655 name = state->srpc->recv_chars();
00656 value = state->srpc->recv_chars();
00657 timestamp = (time_t) state->srpc->recv_int();
00658 };
00659 VestaSource::errorCode execute(VSAProgramState* state) {
00660 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00661 VestaSource* vs = state->vss.get(vsi);
00662 if (vs == NULL) return VestaSource::invalidArgs;
00663
00664
00665
00666
00667
00668
00669 if(!vs->hasAttribs() && MutableRootLongId.isAncestorOf(vs->longid))
00670 {
00671 VestaSource *new_vs = 0;
00672 VestaSource::errorCode err = vs->copyToMutable(new_vs, state->who);
00673 if(err != VestaSource::ok) return err;
00674 assert(new_vs != 0);
00675 delete vs;
00676 vs = new_vs;
00677 state->vss.put(vsi, vs);
00678
00679
00680
00681 for(int i = 0; i < state->vss.size(); i++)
00682 {
00683 if(i == vsi) continue;
00684 VestaSource *other_vs = state->vss.get(i);
00685 if(other_vs &&
00686 (other_vs->type == VestaSource::immutableDirectory) &&
00687 (other_vs->longid.isAncestorOf(vs->longid)))
00688 {
00689 VestaSource *new_other_vs = other_vs->longid.lookup();
00690 assert(new_other_vs != 0);
00691 delete other_vs;
00692 other_vs = new_other_vs;
00693 state->vss.put(i, other_vs);
00694 }
00695 }
00696 }
00697 return vs->writeAttrib(op, name, value, state->who,
00698 timestamp ? timestamp : state->now);
00699 };
00700 ~WriteAttribStep() {
00701 delete[] name;
00702 delete[] value;
00703 };
00704 };
00705
00706
00707 class AccessCheckStep : public VSAStep {
00708 VestaSourceAtomic::VSIndex vsi;
00709 AccessControl::Class cls;
00710 bool expected;
00711 VestaSource::errorCode err;
00712 public:
00713 AccessCheckStep(VSAProgramState* state) {
00714 next = NULL;
00715 vsi = state->srpc->recv_int();
00716 cls = (AccessControl::Class) state->srpc->recv_int();
00717 expected = (bool) state->srpc->recv_int();
00718 err = (VestaSource::errorCode) state->srpc->recv_int();
00719 };
00720 VestaSource::errorCode execute(VSAProgramState* state) {
00721 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00722 VestaSource* vs = state->vss.get(vsi);
00723 if (vs == NULL) return VestaSource::invalidArgs;
00724 if (vs->ac.check(state->who, cls) == expected) {
00725 return VestaSource::ok;
00726 } else {
00727 return err;
00728 };
00729 };
00730 };
00731
00732 class TypeCheckStep : public VSAStep {
00733 VestaSourceAtomic::VSIndex vsi;
00734 unsigned int allowed;
00735 VestaSource::errorCode err;
00736 public:
00737 TypeCheckStep(VSAProgramState* state) {
00738 next = NULL;
00739 vsi = state->srpc->recv_int();
00740 allowed = (unsigned int) state->srpc->recv_int();
00741 err = (VestaSource::errorCode) state->srpc->recv_int();
00742 };
00743 VestaSource::errorCode execute(VSAProgramState* state) {
00744 if (vsi < 0 || vsi >= state->vss.size()) return VestaSource::invalidArgs;
00745 VestaSource* vs = state->vss.get(vsi);
00746 if (vs == NULL) return VestaSource::invalidArgs;
00747 if (VestaSourceAtomic::typebit(vs->type) & allowed) {
00748 return VestaSource::ok;
00749 } else {
00750 return err;
00751 };
00752 };
00753 };
00754
00755 static bool mergeAttribCallback(void* closure, const char* name);
00756 static bool mergeValueCallback(void* closure, const char* value);
00757
00758 class MergeAttribStep : public VSAStep {
00759 VestaSourceAtomic::VSIndex fromvsi;
00760 VestaSourceAtomic::VSIndex tovsi;
00761 char* name;
00762 time_t timestamp;
00763
00764 VSAProgramState* state;
00765 VestaSource* fromvs;
00766 VestaSource* tovs;
00767 const char* curname;
00768 VestaSource::errorCode err;
00769 friend bool mergeAttribCallback(void* closure, const char* name);
00770 friend bool mergeValueCallback(void* closure, const char* value);
00771 public:
00772 MergeAttribStep(VSAProgramState* state) {
00773 next = NULL;
00774 fromvsi = state->srpc->recv_int();
00775 tovsi = state->srpc->recv_int();
00776 name = state->srpc->recv_chars();
00777 timestamp = (time_t) state->srpc->recv_int();
00778 };
00779 VestaSource::errorCode execute(VSAProgramState* state) {
00780 if (fromvsi < 0 || fromvsi >= state->vss.size()) {
00781 return VestaSource::invalidArgs;
00782 }
00783 fromvs = state->vss.get(fromvsi);
00784 if (fromvs == NULL) return VestaSource::invalidArgs;
00785 if (tovsi < 0 || tovsi >= state->vss.size()) {
00786 return VestaSource::invalidArgs;
00787 }
00788 tovs = state->vss.get(tovsi);
00789 if (tovs == NULL) return VestaSource::invalidArgs;
00790 err = VestaSource::ok;
00791 this->state = state;
00792 if (*name == '\0') {
00793 fromvs->listAttribs(mergeAttribCallback, (void*) this);
00794 } else {
00795 mergeAttribCallback((void*) this, name);
00796 }
00797 return err;
00798 };
00799 ~MergeAttribStep() {
00800 delete[] name;
00801 };
00802 };
00803
00804 static bool
00805 mergeValueCallback(void* closure, const char* value)
00806 {
00807 MergeAttribStep* step = (MergeAttribStep*) closure;
00808 step->err = step->tovs->addAttrib(step->curname, value,
00809 step->state->who, step->timestamp ?
00810 step->timestamp : step->state->now);
00811 return (step->err == VestaSource::ok);
00812 }
00813
00814 static bool
00815 mergeAttribCallback(void* closure, const char* name)
00816 {
00817 MergeAttribStep* step = (MergeAttribStep*) closure;
00818 step->curname = name;
00819 step->fromvs->getAttrib(name, mergeValueCallback, closure);
00820 return (step->err == VestaSource::ok);
00821 }
00822
00823
00824 void
00825 DeleteProgram(VSAProgramState* state)
00826 {
00827 while (state->first) {
00828 VSAStep *vsac;
00829 vsac = state->first;
00830 state->first = state->first->next;
00831 delete vsac;
00832 }
00833 int i, n;
00834 n = state->vss.size();
00835 for (i=0; i<n; i++) {
00836 VestaSource* vs = state->vss.get(i);
00837 if (vs) delete vs;
00838 }
00839 delete state->who;
00840 }
00841
00842 void
00843 VSAtomic(SRPC* srpc, int intf_ver)
00844 {
00845 VSAProgramState state;
00846 VestaSource::errorCode err;
00847
00848
00849 state.pc = 0;
00850 state.first = state.last = NULL;
00851 state.target1 = VestaSource::ok;
00852 state.target2 = VestaSource::ok;
00853 state.okreplace = VestaSource::ok;
00854 state.lock = &StableLock;
00855 state.who = srpc_recv_identity(srpc, intf_ver);
00856 state.srpc = srpc;
00857 state.intf_ver = intf_ver;
00858
00859 #if defined(REPOS_PERF_DEBUG)
00860 OBufStream l_locked_reason;
00861 bool first_action = true;
00862 l_locked_reason << "VSAtomic:{";
00863 #endif
00864
00865
00866 try {
00867 for (;;) {
00868
00869 int opcode = srpc->recv_int();
00870 VSAStep *vsac;
00871
00872 switch (opcode) {
00873 case VestaSourceSRPC::AtomicRun:
00874
00875 goto done;
00876 case VestaSourceSRPC::AtomicCancel:
00877 srpc->recv_end();
00878 srpc->send_end();
00879 DeleteProgram(&state);
00880 return;
00881 case VestaSourceSRPC::AtomicTarget:
00882 vsac = NEW_CONSTR(SetTargetStep, (&state));
00883 #if defined(REPOS_PERF_DEBUG)
00884 if(first_action)
00885 first_action = false;
00886 else
00887 l_locked_reason << ",";
00888 l_locked_reason << "SetTarget";
00889 #endif
00890 break;
00891 case VestaSourceSRPC::AtomicDeclare:
00892 vsac = NEW_CONSTR(DeclStep, (&state));
00893 #if defined(REPOS_PERF_DEBUG)
00894 if(first_action)
00895 first_action = false;
00896 else
00897 l_locked_reason << ",";
00898 l_locked_reason << "Decl";
00899 #endif
00900 break;
00901 case VestaSourceSRPC::AtomicResync:
00902 vsac = NEW_CONSTR(ResyncStep, (&state));
00903 #if defined(REPOS_PERF_DEBUG)
00904 if(first_action)
00905 first_action = false;
00906 else
00907 l_locked_reason << ",";
00908 l_locked_reason << "Resync";
00909 #endif
00910 break;
00911 case VestaSourceSRPC::SetTimestamp:
00912 vsac = NEW_CONSTR(SetTimestampStep, (&state));
00913 #if defined(REPOS_PERF_DEBUG)
00914 if(first_action)
00915 first_action = false;
00916 else
00917 l_locked_reason << ",";
00918 l_locked_reason << "SetTime";
00919 #endif
00920 break;
00921 case VestaSourceSRPC::Lookup:
00922 vsac = NEW_CONSTR(LookupStep, (&state));
00923 #if defined(REPOS_PERF_DEBUG)
00924 if(first_action)
00925 first_action = false;
00926 else
00927 l_locked_reason << ",";
00928 l_locked_reason << "Lookup";
00929 #endif
00930 break;
00931 case VestaSourceSRPC::LookupPathname:
00932 vsac = NEW_CONSTR(LookupPathnameStep, (&state));
00933 #if defined(REPOS_PERF_DEBUG)
00934 if(first_action)
00935 first_action = false;
00936 else
00937 l_locked_reason << ",";
00938 l_locked_reason << "LookupPath";
00939 #endif
00940 break;
00941 case VestaSourceSRPC::LookupIndex:
00942 vsac = NEW_CONSTR(LookupIndexStep, (&state));
00943 #if defined(REPOS_PERF_DEBUG)
00944 if(first_action)
00945 first_action = false;
00946 else
00947 l_locked_reason << ",";
00948 l_locked_reason << "LookupIdx";
00949 #endif
00950 break;
00951 case VestaSourceSRPC::ReallyDelete:
00952 vsac = NEW_CONSTR(ReallyDeleteStep, (&state));
00953 #if defined(REPOS_PERF_DEBUG)
00954 if(first_action)
00955 first_action = false;
00956 else
00957 l_locked_reason << ",";
00958 l_locked_reason << "ReallyDel";
00959 #endif
00960 break;
00961 case VestaSourceSRPC::InsertFile:
00962 vsac = NEW_CONSTR(InsertFileStep, (&state));
00963 #if defined(REPOS_PERF_DEBUG)
00964 if(first_action)
00965 first_action = false;
00966 else
00967 l_locked_reason << ",";
00968 l_locked_reason << "InsertFile";
00969 #endif
00970 break;
00971 case VestaSourceSRPC::InsertMutableFile:
00972 vsac = NEW_CONSTR(InsertMutableFileStep, (&state));
00973 #if defined(REPOS_PERF_DEBUG)
00974 if(first_action)
00975 first_action = false;
00976 else
00977 l_locked_reason << ",";
00978 l_locked_reason << "InsertMutFile";
00979 #endif
00980 break;
00981 case VestaSourceSRPC::InsertImmutableDirectory:
00982 vsac = NEW_CONSTR(InsertImmutableDirectoryStep, (&state));
00983 #if defined(REPOS_PERF_DEBUG)
00984 if(first_action)
00985 first_action = false;
00986 else
00987 l_locked_reason << ",";
00988 l_locked_reason << "InsertImmutDir";
00989 #endif
00990 break;
00991 case VestaSourceSRPC::InsertAppendableDirectory:
00992 vsac = NEW_CONSTR(InsertAppendableDirectoryStep, (&state));
00993 #if defined(REPOS_PERF_DEBUG)
00994 if(first_action)
00995 first_action = false;
00996 else
00997 l_locked_reason << ",";
00998 l_locked_reason << "InsertAppDir";
00999 #endif
01000 break;
01001 case VestaSourceSRPC::InsertMutableDirectory:
01002 vsac = NEW_CONSTR(InsertMutableDirectoryStep, (&state));
01003 #if defined(REPOS_PERF_DEBUG)
01004 if(first_action)
01005 first_action = false;
01006 else
01007 l_locked_reason << ",";
01008 l_locked_reason << "InsertMutDir";
01009 #endif
01010 break;
01011 case VestaSourceSRPC::InsertGhost:
01012 vsac = NEW_CONSTR(InsertGhostStep, (&state));
01013 #if defined(REPOS_PERF_DEBUG)
01014 if(first_action)
01015 first_action = false;
01016 else
01017 l_locked_reason << ",";
01018 l_locked_reason << "InsertGhost";
01019 #endif
01020 break;
01021 case VestaSourceSRPC::InsertStub:
01022 vsac = NEW_CONSTR(InsertStubStep, (&state));
01023 #if defined(REPOS_PERF_DEBUG)
01024 if(first_action)
01025 first_action = false;
01026 else
01027 l_locked_reason << ",";
01028 l_locked_reason << "InsertStub";
01029 #endif
01030 break;
01031 case VestaSourceSRPC::RenameTo:
01032 vsac = NEW_CONSTR(RenameToStep, (&state));
01033 #if defined(REPOS_PERF_DEBUG)
01034 if(first_action)
01035 first_action = false;
01036 else
01037 l_locked_reason << ",";
01038 l_locked_reason << "RenameTo";
01039 #endif
01040 break;
01041 case VestaSourceSRPC::MakeFilesImmutable:
01042 vsac = NEW_CONSTR(MakeFilesImmutableStep, (&state));
01043 #if defined(REPOS_PERF_DEBUG)
01044 if(first_action)
01045 first_action = false;
01046 else
01047 l_locked_reason << ",";
01048 l_locked_reason << "MakeFilesImmut";
01049 #endif
01050 break;
01051 case VestaSourceSRPC::AtomicTestMaster:
01052 vsac = NEW_CONSTR(TestMasterStep, (&state));
01053 #if defined(REPOS_PERF_DEBUG)
01054 if(first_action)
01055 first_action = false;
01056 else
01057 l_locked_reason << ",";
01058 l_locked_reason << "TestMaster";
01059 #endif
01060 break;
01061 case VestaSourceSRPC::AtomicSetMaster:
01062 vsac = NEW_CONSTR(SetMasterStep, (&state));
01063 #if defined(REPOS_PERF_DEBUG)
01064 if(first_action)
01065 first_action = false;
01066 else
01067 l_locked_reason << ",";
01068 l_locked_reason << "SetMaster";
01069 #endif
01070 break;
01071 case VestaSourceSRPC::InAttribs:
01072 vsac = NEW_CONSTR(InAttribsStep, (&state));
01073 #if defined(REPOS_PERF_DEBUG)
01074 if(first_action)
01075 first_action = false;
01076 else
01077 l_locked_reason << ",";
01078 l_locked_reason << "InAttribs";
01079 #endif
01080 break;
01081 case VestaSourceSRPC::WriteAttrib:
01082 vsac = NEW_CONSTR(WriteAttribStep, (&state));
01083 #if defined(REPOS_PERF_DEBUG)
01084 if(first_action)
01085 first_action = false;
01086 else
01087 l_locked_reason << ",";
01088 l_locked_reason << "WriteAttrib";
01089 #endif
01090 break;
01091 case VestaSourceSRPC::AtomicAccessCheck:
01092 vsac = NEW_CONSTR(AccessCheckStep, (&state));
01093 #if defined(REPOS_PERF_DEBUG)
01094 if(first_action)
01095 first_action = false;
01096 else
01097 l_locked_reason << ",";
01098 l_locked_reason << "AccessCheck";
01099 #endif
01100 break;
01101 case VestaSourceSRPC::AtomicTypeCheck:
01102 vsac = NEW_CONSTR(TypeCheckStep, (&state));
01103 #if defined(REPOS_PERF_DEBUG)
01104 if(first_action)
01105 first_action = false;
01106 else
01107 l_locked_reason << ",";
01108 l_locked_reason << "TypeCheck";
01109 #endif
01110 break;
01111 case VestaSourceSRPC::AtomicMergeAttrib:
01112 vsac = NEW_CONSTR(MergeAttribStep, (&state));
01113 #if defined(REPOS_PERF_DEBUG)
01114 if(first_action)
01115 first_action = false;
01116 else
01117 l_locked_reason << ",";
01118 l_locked_reason << "MergeAttrib";
01119 #endif
01120 break;
01121 default:
01122 Repos::dprintf(DBG_ALWAYS,
01123 "VestaSourceReceptionist got misplaced Atomic opcode %d; "
01124 "client %s\n", (int) opcode, srpc->remote_socket().cchars());
01125
01126 srpc->send_failure(SRPC::version_skew,
01127 "VestaSourceSRPC: Unknown Atomic opcode", false);
01128 break;
01129 }
01130
01131 if (state.first == NULL) {
01132 state.first = vsac;
01133 } else {
01134 state.last->next = vsac;
01135 }
01136 state.last = vsac;
01137 }
01138 done:
01139 srpc->recv_end();
01140
01141 } catch (SRPC::failure f) {
01142 DeleteProgram(&state);
01143 throw;
01144 }
01145
01146
01147 state.lock->acquireWrite();
01148 RWLOCK_LOCKED_REASON(state.lock, l_locked_reason.str());
01149 VRLog.start();
01150 state.now = time(NULL);
01151 err = VestaSource::ok;
01152 try {
01153 VSAStep* vsac = state.first;
01154 while (vsac) {
01155 err = vsac->execute(&state);
01156 if (err != state.target1 && err != state.target2) {
01157 break;
01158 }
01159 state.pc++;
01160 vsac = vsac->next;
01161 }
01162
01163 VRLog.commit();
01164 state.lock->releaseWrite();
01165 state.lock = NULL;
01166
01167
01168 srpc->send_int(state.pc);
01169 srpc->send_int((int) err);
01170 srpc->send_int((int) state.okreplace);
01171 srpc->send_int(vsac == NULL);
01172 srpc->send_end();
01173
01174 } catch (SRPC::failure f) {
01175 if (state.lock) {
01176 VRLog.commit();
01177 state.lock->releaseWrite();
01178 }
01179 DeleteProgram(&state);
01180 throw;
01181 }
01182
01183
01184 DeleteProgram(&state);
01185 }