|
@@ -155,8 +155,10 @@ typedef struct {
|
|
int flags_total[2], flags_done[2];
|
|
int flags_total[2], flags_done[2];
|
|
int trash_total[2], trash_done[2];
|
|
int trash_total[2], trash_done[2];
|
|
int maxuid[2]; /* highest UID that was already propagated */
|
|
int maxuid[2]; /* highest UID that was already propagated */
|
|
|
|
+ int newmaxuid[2]; /* highest UID that is currently being propagated */
|
|
int uidval[2]; /* UID validity value */
|
|
int uidval[2]; /* UID validity value */
|
|
int newuid[2]; /* TUID lookup makes sense only for UIDs >= this */
|
|
int newuid[2]; /* TUID lookup makes sense only for UIDs >= this */
|
|
|
|
+ int mmaxxuid; /* highest expired UID on master during new message propagation */
|
|
int smaxxuid; /* highest expired UID on slave */
|
|
int smaxxuid; /* highest expired UID on slave */
|
|
} sync_vars_t;
|
|
} sync_vars_t;
|
|
|
|
|
|
@@ -806,6 +808,9 @@ box_selected( int sts, void *aux )
|
|
goto bail;
|
|
goto bail;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ svars->newmaxuid[M] = svars->maxuid[M];
|
|
|
|
+ svars->newmaxuid[S] = svars->maxuid[S];
|
|
|
|
+ svars->mmaxxuid = INT_MAX;
|
|
line = 0;
|
|
line = 0;
|
|
if ((jfp = fopen( svars->jname, "r" ))) {
|
|
if ((jfp = fopen( svars->jname, "r" ))) {
|
|
if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp )) {
|
|
if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp )) {
|
|
@@ -829,7 +834,7 @@ box_selected( int sts, void *aux )
|
|
}
|
|
}
|
|
if (buf[0] == '#' ?
|
|
if (buf[0] == '#' ?
|
|
(t3 = 0, (sscanf( buf + 2, "%d %d %n", &t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) :
|
|
(t3 = 0, (sscanf( buf + 2, "%d %d %n", &t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) :
|
|
- buf[0] == '(' || buf[0] == ')' || buf[0] == '{' || buf[0] == '}' ?
|
|
|
|
|
|
+ buf[0] == '(' || buf[0] == ')' || buf[0] == '{' || buf[0] == '}' || buf[0] == '!' ?
|
|
(sscanf( buf + 2, "%d", &t1 ) != 1) :
|
|
(sscanf( buf + 2, "%d", &t1 ) != 1) :
|
|
buf[0] == '+' || buf[0] == '&' || buf[0] == '-' || buf[0] == '|' || buf[0] == '/' || buf[0] == '\\' ?
|
|
buf[0] == '+' || buf[0] == '&' || buf[0] == '-' || buf[0] == '|' || buf[0] == '/' || buf[0] == '\\' ?
|
|
(sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) :
|
|
(sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) :
|
|
@@ -846,6 +851,8 @@ box_selected( int sts, void *aux )
|
|
svars->newuid[M] = t1;
|
|
svars->newuid[M] = t1;
|
|
else if (buf[0] == '}')
|
|
else if (buf[0] == '}')
|
|
svars->newuid[S] = t1;
|
|
svars->newuid[S] = t1;
|
|
|
|
+ else if (buf[0] == '!')
|
|
|
|
+ svars->smaxxuid = t1;
|
|
else if (buf[0] == '|') {
|
|
else if (buf[0] == '|') {
|
|
svars->uidval[M] = t1;
|
|
svars->uidval[M] = t1;
|
|
svars->uidval[S] = t2;
|
|
svars->uidval[S] = t2;
|
|
@@ -853,6 +860,10 @@ box_selected( int sts, void *aux )
|
|
srec = nfmalloc( sizeof(*srec) );
|
|
srec = nfmalloc( sizeof(*srec) );
|
|
srec->uid[M] = t1;
|
|
srec->uid[M] = t1;
|
|
srec->uid[S] = t2;
|
|
srec->uid[S] = t2;
|
|
|
|
+ if (svars->newmaxuid[M] < t1)
|
|
|
|
+ svars->newmaxuid[M] = t1;
|
|
|
|
+ if (svars->newmaxuid[S] < t2)
|
|
|
|
+ svars->newmaxuid[S] = t2;
|
|
debug( " new entry(%d,%d)\n", t1, t2 );
|
|
debug( " new entry(%d,%d)\n", t1, t2 );
|
|
srec->msg[M] = srec->msg[S] = 0;
|
|
srec->msg[M] = srec->msg[S] = 0;
|
|
srec->status = 0;
|
|
srec->status = 0;
|
|
@@ -876,6 +887,8 @@ box_selected( int sts, void *aux )
|
|
switch (buf[0]) {
|
|
switch (buf[0]) {
|
|
case '-':
|
|
case '-':
|
|
debug( "killed\n" );
|
|
debug( "killed\n" );
|
|
|
|
+ if (srec->msg[M])
|
|
|
|
+ srec->msg[M]->srec = 0;
|
|
srec->status = S_DEAD;
|
|
srec->status = S_DEAD;
|
|
break;
|
|
break;
|
|
case '#':
|
|
case '#':
|
|
@@ -1014,7 +1027,7 @@ box_selected( int sts, void *aux )
|
|
mexcs = 0;
|
|
mexcs = 0;
|
|
nmexcs = rmexcs = 0;
|
|
nmexcs = rmexcs = 0;
|
|
if (svars->ctx[M]->opts & OPEN_OLD) {
|
|
if (svars->ctx[M]->opts & OPEN_OLD) {
|
|
- if (svars->smaxxuid) {
|
|
|
|
|
|
+ if (chan->max_messages) {
|
|
/* When messages have been expired on the slave, the master fetch is split into
|
|
/* When messages have been expired on the slave, the master fetch is split into
|
|
* two ranges: The bulk fetch which corresponds with the most recent messages, and an
|
|
* two ranges: The bulk fetch which corresponds with the most recent messages, and an
|
|
* exception list of messages which would have been expired if they weren't important. */
|
|
* exception list of messages which would have been expired if they weren't important. */
|
|
@@ -1198,10 +1211,14 @@ box_loaded( int sts, void *aux )
|
|
* - message is old (> 0) or expired (0) => ignore
|
|
* - message is old (> 0) or expired (0) => ignore
|
|
* - message was skipped (-1) => ReNew
|
|
* - message was skipped (-1) => ReNew
|
|
* - message was attempted, but failed (-2) => New
|
|
* - message was attempted, but failed (-2) => New
|
|
- * If new have no srec, the message is always New. */
|
|
|
|
|
|
+ * If new have no srec, the message is always New. If messages were previously ignored
|
|
|
|
+ * due to being excessive, they would now appear to be newer than the messages that
|
|
|
|
+ * got actually synced, so make sure to look only at the newest ones. As some messages
|
|
|
|
+ * may be already propagated before an interruption, and maxuid logging is delayed,
|
|
|
|
+ * we need to track the newmaxuid separately. */
|
|
srec = tmsg->srec;
|
|
srec = tmsg->srec;
|
|
if (srec ? srec->uid[t] < 0 && (svars->chan->ops[t] & (srec->uid[t] == -1 ? OP_RENEW : OP_NEW))
|
|
if (srec ? srec->uid[t] < 0 && (svars->chan->ops[t] & (srec->uid[t] == -1 ? OP_RENEW : OP_NEW))
|
|
- : (svars->chan->ops[t] & OP_NEW)) {
|
|
|
|
|
|
+ : svars->newmaxuid[1-t] < tmsg->uid && (svars->chan->ops[t] & OP_NEW)) {
|
|
debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] );
|
|
debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] );
|
|
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED))
|
|
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED))
|
|
debug( " -> not %sing - would be expunged anyway\n", str_hl[t] );
|
|
debug( " -> not %sing - would be expunged anyway\n", str_hl[t] );
|
|
@@ -1220,7 +1237,11 @@ box_loaded( int sts, void *aux )
|
|
srec->tuid[0] = 0;
|
|
srec->tuid[0] = 0;
|
|
srec->uid[1-t] = tmsg->uid;
|
|
srec->uid[1-t] = tmsg->uid;
|
|
srec->uid[t] = -2;
|
|
srec->uid[t] = -2;
|
|
|
|
+ srec->msg[1-t] = tmsg;
|
|
|
|
+ srec->msg[t] = 0;
|
|
tmsg->srec = srec;
|
|
tmsg->srec = srec;
|
|
|
|
+ if (svars->newmaxuid[1-t] < tmsg->uid)
|
|
|
|
+ svars->newmaxuid[1-t] = tmsg->uid;
|
|
Fprintf( svars->jfp, "+ %d %d\n", srec->uid[M], srec->uid[S] );
|
|
Fprintf( svars->jfp, "+ %d %d\n", srec->uid[M], srec->uid[S] );
|
|
debug( " -> pair(%d,%d) created\n", srec->uid[M], srec->uid[S] );
|
|
debug( " -> pair(%d,%d) created\n", srec->uid[M], srec->uid[S] );
|
|
}
|
|
}
|
|
@@ -1346,6 +1367,10 @@ box_loaded( int sts, void *aux )
|
|
alive++;
|
|
alive++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ for (tmsg = svars->ctx[M]->msgs; tmsg; tmsg = tmsg->next) {
|
|
|
|
+ if ((srec = tmsg->srec) && srec->tuid[0] && !(tmsg->flags & F_DELETED))
|
|
|
|
+ alive++;
|
|
|
|
+ }
|
|
todel = alive - svars->chan->max_messages;
|
|
todel = alive - svars->chan->max_messages;
|
|
debug( "%d alive messages, %d excess - expiring\n", alive, todel );
|
|
debug( "%d alive messages, %d excess - expiring\n", alive, todel );
|
|
for (tmsg = svars->ctx[S]->msgs; tmsg; tmsg = tmsg->next) {
|
|
for (tmsg = svars->ctx[S]->msgs; tmsg; tmsg = tmsg->next) {
|
|
@@ -1353,6 +1378,7 @@ box_loaded( int sts, void *aux )
|
|
continue;
|
|
continue;
|
|
if (!(srec = tmsg->srec) || srec->uid[M] <= 0) {
|
|
if (!(srec = tmsg->srec) || srec->uid[M] <= 0) {
|
|
/* We did not push the message, so it must be kept. */
|
|
/* We did not push the message, so it must be kept. */
|
|
|
|
+ debug( " old pair(%d,%d) unpropagated\n", srec->uid[M], srec->uid[S] );
|
|
todel--;
|
|
todel--;
|
|
} else {
|
|
} else {
|
|
nflags = (tmsg->flags | srec->aflags[S]) & ~srec->dflags[S];
|
|
nflags = (tmsg->flags | srec->aflags[S]) & ~srec->dflags[S];
|
|
@@ -1360,13 +1386,32 @@ box_loaded( int sts, void *aux )
|
|
/* The message is not deleted, or is already (being) expired. */
|
|
/* The message is not deleted, or is already (being) expired. */
|
|
if ((nflags & F_FLAGGED) || !(nflags & F_SEEN)) {
|
|
if ((nflags & F_FLAGGED) || !(nflags & F_SEEN)) {
|
|
/* Important messages are always kept. */
|
|
/* Important messages are always kept. */
|
|
|
|
+ debug( " old pair(%d,%d) important\n", srec->uid[M], srec->uid[S] );
|
|
todel--;
|
|
todel--;
|
|
} else if (todel > 0 ||
|
|
} else if (todel > 0 ||
|
|
((srec->status & (S_EXPIRE|S_EXPIRED)) == (S_EXPIRE|S_EXPIRED)) ||
|
|
((srec->status & (S_EXPIRE|S_EXPIRED)) == (S_EXPIRE|S_EXPIRED)) ||
|
|
((srec->status & (S_EXPIRE|S_EXPIRED)) && (tmsg->flags & F_DELETED))) {
|
|
((srec->status & (S_EXPIRE|S_EXPIRED)) && (tmsg->flags & F_DELETED))) {
|
|
/* The message is excess or was already (being) expired. */
|
|
/* The message is excess or was already (being) expired. */
|
|
srec->status |= S_NEXPIRE;
|
|
srec->status |= S_NEXPIRE;
|
|
- debug( " pair(%d,%d)\n", srec->uid[M], srec->uid[S] );
|
|
|
|
|
|
+ debug( " old pair(%d,%d) expired\n", srec->uid[M], srec->uid[S] );
|
|
|
|
+ todel--;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ for (tmsg = svars->ctx[M]->msgs; tmsg; tmsg = tmsg->next) {
|
|
|
|
+ if ((srec = tmsg->srec) && srec->tuid[0]) {
|
|
|
|
+ nflags = tmsg->flags;
|
|
|
|
+ if (!(nflags & F_DELETED)) {
|
|
|
|
+ if ((nflags & F_FLAGGED) || !(nflags & F_SEEN)) {
|
|
|
|
+ /* Important messages are always fetched. */
|
|
|
|
+ debug( " new pair(%d,%d) important\n", srec->uid[M], srec->uid[S] );
|
|
|
|
+ todel--;
|
|
|
|
+ } else if (todel > 0) {
|
|
|
|
+ /* The message is excess. */
|
|
|
|
+ srec->status |= S_NEXPIRE;
|
|
|
|
+ debug( " new pair(%d,%d) expired\n", srec->uid[M], srec->uid[S] );
|
|
|
|
+ svars->mmaxxuid = srec->uid[M];
|
|
todel--;
|
|
todel--;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1374,23 +1419,34 @@ box_loaded( int sts, void *aux )
|
|
}
|
|
}
|
|
debug( "%d excess messages remain\n", todel );
|
|
debug( "%d excess messages remain\n", todel );
|
|
for (srec = svars->srecs; srec; srec = srec->next) {
|
|
for (srec = svars->srecs; srec; srec = srec->next) {
|
|
- if ((srec->status & (S_DEAD|S_DONE)) || !srec->msg[S])
|
|
|
|
|
|
+ if (srec->status & S_DEAD)
|
|
continue;
|
|
continue;
|
|
- nex = (srec->status / S_NEXPIRE) & 1;
|
|
|
|
- if (nex != ((srec->status / S_EXPIRED) & 1)) {
|
|
|
|
- /* The record needs a state change ... */
|
|
|
|
- if (nex != ((srec->status / S_EXPIRE) & 1)) {
|
|
|
|
- /* ... and we need to start a transaction. */
|
|
|
|
- Fprintf( svars->jfp, "~ %d %d %d\n", srec->uid[M], srec->uid[S], nex );
|
|
|
|
- debug( " pair(%d,%d): %d (pre)\n", srec->uid[M], srec->uid[S], nex );
|
|
|
|
- srec->status = (srec->status & ~S_EXPIRE) | (nex * S_EXPIRE);
|
|
|
|
|
|
+ if (!srec->tuid[0]) {
|
|
|
|
+ if (!srec->msg[S])
|
|
|
|
+ continue;
|
|
|
|
+ nex = (srec->status / S_NEXPIRE) & 1;
|
|
|
|
+ if (nex != ((srec->status / S_EXPIRED) & 1)) {
|
|
|
|
+ /* The record needs a state change ... */
|
|
|
|
+ if (nex != ((srec->status / S_EXPIRE) & 1)) {
|
|
|
|
+ /* ... and we need to start a transaction. */
|
|
|
|
+ Fprintf( svars->jfp, "~ %d %d %d\n", srec->uid[M], srec->uid[S], nex );
|
|
|
|
+ debug( " pair(%d,%d): %d (pre)\n", srec->uid[M], srec->uid[S], nex );
|
|
|
|
+ srec->status = (srec->status & ~S_EXPIRE) | (nex * S_EXPIRE);
|
|
|
|
+ } else {
|
|
|
|
+ /* ... but the "right" transaction is already pending. */
|
|
|
|
+ debug( " pair(%d,%d): %d (pending)\n", srec->uid[M], srec->uid[S], nex );
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
- /* ... but the "right" transaction is already pending. */
|
|
|
|
- debug( " pair(%d,%d): %d (pending)\n", srec->uid[M], srec->uid[S], nex );
|
|
|
|
|
|
+ /* Note: the "wrong" transaction may be pending here,
|
|
|
|
+ * e.g.: S_NEXPIRE = 0, S_EXPIRE = 1, S_EXPIRED = 0. */
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- /* Note: the "wrong" transaction may be pending here,
|
|
|
|
- * e.g.: S_NEXPIRE = 0, S_EXPIRE = 1, S_EXPIRED = 0. */
|
|
|
|
|
|
+ if (srec->status & S_NEXPIRE) {
|
|
|
|
+ Fprintf( svars->jfp, "- %d %d\n", srec->uid[M], srec->uid[S] );
|
|
|
|
+ debug( " pair(%d,%d): 1 (abort)\n", srec->uid[M], srec->uid[S] );
|
|
|
|
+ srec->msg[M]->srec = 0;
|
|
|
|
+ srec->status = S_DEAD;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1515,6 +1571,16 @@ msg_copied_p2( sync_vars_t *svars, sync_rec_t *srec, int t, int uid )
|
|
srec->uid[t] = uid;
|
|
srec->uid[t] = uid;
|
|
srec->tuid[0] = 0;
|
|
srec->tuid[0] = 0;
|
|
}
|
|
}
|
|
|
|
+ if (t == S && svars->mmaxxuid < srec->uid[M]) {
|
|
|
|
+ /* If we have so many new messages that some of them are instantly expired,
|
|
|
|
+ * but some are still propagated because they are important, we need to
|
|
|
|
+ * ensure explicitly that the bulk fetch limit is upped. */
|
|
|
|
+ svars->mmaxxuid = INT_MAX;
|
|
|
|
+ if (svars->smaxxuid < srec->uid[S] - 1) {
|
|
|
|
+ svars->smaxxuid = srec->uid[S] - 1;
|
|
|
|
+ Fprintf( svars->jfp, "! %d\n", svars->smaxxuid );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
static void msgs_found_new( int sts, void *aux );
|
|
static void msgs_found_new( int sts, void *aux );
|
|
@@ -1735,13 +1801,14 @@ box_closed_p2( sync_vars_t *svars, int t )
|
|
if (!(svars->state[1-t] & ST_CLOSED))
|
|
if (!(svars->state[1-t] & ST_CLOSED))
|
|
return;
|
|
return;
|
|
|
|
|
|
- if (((svars->state[M] | svars->state[S]) & ST_DID_EXPUNGE) || svars->smaxxuid) {
|
|
|
|
|
|
+ if (((svars->state[M] | svars->state[S]) & ST_DID_EXPUNGE) || svars->chan->max_messages) {
|
|
/* This cleanup is not strictly necessary, as the next full sync
|
|
/* This cleanup is not strictly necessary, as the next full sync
|
|
would throw out the dead entries anyway. But ... */
|
|
would throw out the dead entries anyway. But ... */
|
|
|
|
+ debug( "purging obsolete entries\n" );
|
|
|
|
|
|
minwuid = INT_MAX;
|
|
minwuid = INT_MAX;
|
|
- if (svars->smaxxuid) {
|
|
|
|
- debug( "preparing entry purge - max expired slave uid is %d\n", svars->smaxxuid );
|
|
|
|
|
|
+ if (svars->chan->max_messages) {
|
|
|
|
+ debug( " max expired slave uid is %d\n", svars->smaxxuid );
|
|
for (srec = svars->srecs; srec; srec = srec->next) {
|
|
for (srec = svars->srecs; srec; srec = srec->next) {
|
|
if (srec->status & S_DEAD)
|
|
if (srec->status & S_DEAD)
|
|
continue;
|
|
continue;
|