Răsfoiți Sursa

fully asynchronous IMAP operation

- asynchronous sockets using an event loop
  - connect & starttls have completion callback parameters
  - callbacks for notification about filled input buffer and emptied
    output buffer
- unsent imap command queue
  - used when
    - socket output buffer is non-empty
    - number of commands in flight exceeds limit
    - last sent command requires round-trip
    - command has a dependency on completion of previous command
  - trashnc is tri-state so only a single "scout" trash APPEND/COPY is
    sent at first. a possibly resulting CREATE is injected in front of
    the remaining trash commands, so they can succeed (or be cancel()d
    if it fails).
  - queue's presence necessitates imap_cancel implementation
Oswald Buddenhagen 13 ani în urmă
părinte
comite
bd93d689db
5 a modificat fișierele cu 506 adăugiri și 215 ștergeri
  1. 1 1
      configure.in
  2. 170 119
      src/drv_imap.c
  3. 30 4
      src/isync.h
  4. 1 3
      src/mbsync.1
  5. 304 88
      src/socket.c

+ 1 - 1
configure.in

@@ -9,7 +9,7 @@ if test "$GCC" = yes; then
     CFLAGS="$CFLAGS -pipe -W -Wall -Wshadow -Wstrict-prototypes"
 fi
 
-AC_CHECK_HEADERS(sys/filio.h sys/poll.h sys/select.h)
+AC_CHECK_HEADERS(sys/poll.h sys/select.h)
 AC_CHECK_FUNCS(vasprintf)
 
 AC_CHECK_LIB(socket, socket, [SOCK_LIBS="-lsocket"])

+ 170 - 119
src/drv_imap.c

@@ -81,7 +81,8 @@ typedef struct imap_store {
 	const char *prefix;
 	int ref_count;
 	int uidnext; /* from SELECT responses */
-	unsigned trashnc:1; /* trash folder's existence is not confirmed yet */
+	/* trash folder's existence is not confirmed yet */
+	enum { TrashUnknown, TrashChecking, TrashKnown } trashnc;
 	unsigned got_namespace:1;
 	list_t *ns_personal, *ns_other, *ns_shared; /* NAMESPACE info */
 	message_t **msgapp; /* FETCH results */
@@ -89,12 +90,15 @@ typedef struct imap_store {
 	parse_list_state_t parse_list_sts;
 	/* command queue */
 	int nexttag, num_in_progress, literal_pending;
+	struct imap_cmd *pending, **pending_append;
 	struct imap_cmd *in_progress, **in_progress_append;
 
 	/* Used during sequential operations like connect */
 	enum { GreetingPending = 0, GreetingBad, GreetingOk, GreetingPreauth } greeting;
+	int canceling; /* imap_cancel() is in progress */
 	union {
 		void (*imap_open)( store_t *srv, void *aux );
+		void (*imap_cancel)( void *aux );
 	} callbacks;
 	void *callback_aux;
 
@@ -115,6 +119,7 @@ struct imap_cmd {
 		int data_len;
 		int uid; /* to identify fetch responses */
 		unsigned
+			high_prio:1, /* if command is queued, put it at the front of the queue. */
 			to_trash:1, /* we are storing to trash, not current. */
 			create:1, /* create the mailbox if we get an error ... */
 			trycreate:1; /* ... but only if this is true or the server says so. */
@@ -179,8 +184,6 @@ static const char *cap_list[] = {
 #define RESP_NO       1
 #define RESP_CANCEL   2
 
-static int get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd );
-
 static INLINE void imap_ref( imap_store_t *ctx ) { ++ctx->ref_count; }
 static int imap_deref( imap_store_t *ctx );
 
@@ -221,30 +224,18 @@ done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, int response )
 	free( cmd );
 }
 
-static struct imap_cmd *
-v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd,
-                   const char *fmt, va_list ap )
+static int
+send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
 {
 	int bufl, litplus;
 	const char *buffmt;
 	char buf[1024];
 
-	assert( ctx );
-	assert( ctx->gen.bad_callback );
-	assert( cmd );
-	assert( cmd->param.done );
-
-	while (ctx->literal_pending)
-		if (get_cmd_result( ctx, 0 ) == RESP_CANCEL)
-			goto bail;
-
 	cmd->tag = ++ctx->nexttag;
-	if (fmt)
-		nfvasprintf( &cmd->cmd, fmt, ap );
 	if (!cmd->param.data) {
 		buffmt = "%d %s\r\n";
 		litplus = 0;
-	} else if ((cmd->param.to_trash && ctx->trashnc) || !CAP(LITERALPLUS)) {
+	} else if ((cmd->param.to_trash && ctx->trashnc == TrashUnknown) || !CAP(LITERALPLUS)) {
 		buffmt = "%d %s{%d}\r\n";
 		litplus = 0;
 	} else {
@@ -272,27 +263,52 @@ v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd,
 	} else if (cmd->param.cont || cmd->param.data) {
 		ctx->literal_pending = 1;
 	}
+	if (cmd->param.to_trash && ctx->trashnc == TrashUnknown)
+		ctx->trashnc = TrashChecking;
 	cmd->next = 0;
 	*ctx->in_progress_append = cmd;
 	ctx->in_progress_append = &cmd->next;
 	ctx->num_in_progress++;
-	return cmd;
+	return 0;
 
   bail:
 	done_imap_cmd( ctx, cmd, RESP_CANCEL );
-	return NULL;
+	return -1;
 }
 
-static struct imap_cmd *
-submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, const char *fmt, ... )
+static int
+cmd_submittable( imap_store_t *ctx, struct imap_cmd *cmd )
 {
-	struct imap_cmd *ret;
-	va_list ap;
+	return !ctx->conn.write_buf &&
+	       !ctx->literal_pending &&
+	       !(cmd->param.to_trash && ctx->trashnc == TrashChecking) &&
+	       ctx->num_in_progress < ((imap_store_conf_t *)ctx->gen.conf)->server->max_in_progress;
+}
 
-	va_start( ap, fmt );
-	ret = v_submit_imap_cmd( ctx, cmd, fmt, ap );
-	va_end( ap );
-	return ret;
+static int
+flush_imap_cmds( imap_store_t *ctx )
+{
+	struct imap_cmd *cmd;
+
+	while ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
+		if (!(ctx->pending = cmd->next))
+			ctx->pending_append = &ctx->pending;
+		if (send_imap_cmd( ctx, cmd ) < 0)
+			return -1;
+	}
+	return 0;
+}
+
+static void
+cancel_pending_imap_cmds( imap_store_t *ctx )
+{
+	struct imap_cmd *cmd;
+
+	while ((cmd = ctx->pending)) {
+		if (!(ctx->pending = cmd->next))
+			ctx->pending_append = &ctx->pending;
+		done_imap_cmd( ctx, cmd, RESP_CANCEL );
+	}
 }
 
 static void
@@ -307,6 +323,29 @@ cancel_submitted_imap_cmds( imap_store_t *ctx )
 	}
 }
 
+static int
+submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
+{
+	assert( ctx );
+	assert( ctx->gen.bad_callback );
+	assert( cmd );
+	assert( cmd->param.done );
+
+	if ((ctx->pending && !cmd->param.high_prio) || !cmd_submittable( ctx, cmd )) {
+		if (ctx->pending && cmd->param.high_prio) {
+			cmd->next = ctx->pending;
+			ctx->pending = cmd;
+		} else {
+			cmd->next = 0;
+			*ctx->pending_append = cmd;
+			ctx->pending_append = &cmd->next;
+		}
+		return 0;
+	}
+
+	return send_imap_cmd( ctx, cmd );
+}
+
 static int
 imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
            void (*done)( imap_store_t *ctx, struct imap_cmd *cmd, int response ),
@@ -318,12 +357,9 @@ imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
 		cmdp = new_imap_cmd( sizeof(*cmdp) );
 	cmdp->param.done = done;
 	va_start( ap, fmt );
-	cmdp = v_submit_imap_cmd( ctx, cmdp, fmt, ap );
+	nfvasprintf( &cmdp->cmd, fmt, ap );
 	va_end( ap );
-	if (!cmdp)
-		return RESP_CANCEL;
-
-	return get_cmd_result( ctx, cmdp );
+	return submit_imap_cmd( ctx, cmdp );
 }
 
 static void
@@ -393,25 +429,6 @@ imap_refcounted_done( struct imap_cmd_refcounted_state *sts )
 	free( sts );
 }
 
-/*
-static void
-drain_imap_replies( imap_store_t *ctx )
-{
-	while (ctx->num_in_progress)
-		get_cmd_result( ctx, 0 );
-}
-*/
-
-static int
-process_imap_replies( imap_store_t *ctx )
-{
-	while (ctx->num_in_progress > ((imap_store_conf_t *)ctx->gen.conf)->server->max_in_progress ||
-	       socket_pending( &ctx->conn ))
-		if (get_cmd_result( ctx, 0 ) == RESP_CANCEL)
-			return RESP_CANCEL;
-	return RESP_OK;
-}
-
 static int
 is_atom( list_t *list )
 {
@@ -798,20 +815,22 @@ struct imap_cmd_trycreate {
 static void imap_open_store_greeted( imap_store_t * );
 static void get_cmd_result_p2( imap_store_t *, struct imap_cmd *, int );
 
-static int
-get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
+static void
+imap_socket_read( void *aux )
 {
+	imap_store_t *ctx = (imap_store_t *)aux;
 	struct imap_cmd *cmdp, **pcmdp;
 	char *cmd, *arg, *arg1, *p;
 	int resp, resp2, tag, greeted;
 
 	greeted = ctx->greeting;
+	if (ctx->parse_list_sts.level) {
+		cmd = 0;
+		goto do_fetch;
+	}
 	for (;;) {
-		if (!(cmd = socket_read_line( &ctx->conn ))) {
-			if (socket_fill( &ctx->conn ) < 0)
-				return RESP_CANCEL;
-			continue;
-		}
+		if (!(cmd = socket_read_line( &ctx->conn )))
+			return;
 
 		arg = next_arg( &cmd );
 		if (*arg == '*') {
@@ -850,11 +869,8 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
 				  do_fetch:
 					if ((resp = parse_imap_list( ctx, &cmd, &ctx->parse_list_sts )) == LIST_BAD)
 						break; /* stream is likely to be useless now */
-					if (resp == LIST_PARTIAL) {
-						if (socket_fill( &ctx->conn ) < 0)
-							return RESP_CANCEL;
-						goto do_fetch;
-					}
+					if (resp == LIST_PARTIAL)
+						return;
 					if (parse_fetch( ctx, ctx->parse_list_sts.head ) < 0)
 						break; /* this may mean anything, so prefer not to spam the log */
 				}
@@ -865,8 +881,10 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
 			if (greeted == GreetingPending) {
 				imap_ref( ctx );
 				imap_open_store_greeted( ctx );
-				return imap_deref( ctx ) ? RESP_CANCEL : RESP_OK;
+				if (imap_deref( ctx ))
+					return;
 			}
+			continue;
 		} else if (!ctx->in_progress) {
 			error( "IMAP error: unexpected reply: %s %s\n", arg, cmd ? cmd : "" );
 			break; /* this may mean anything, so prefer not to spam the log */
@@ -876,24 +894,22 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
 			cmdp = ctx->in_progress;
 			if (cmdp->param.data) {
 				if (cmdp->param.to_trash)
-					ctx->trashnc = 0; /* Can't get NO [TRYCREATE] any more. */
+					ctx->trashnc = TrashKnown; /* Can't get NO [TRYCREATE] any more. */
 				p = cmdp->param.data;
 				cmdp->param.data = 0;
 				if (socket_write( &ctx->conn, p, cmdp->param.data_len, GiveOwn ) < 0)
-					return RESP_CANCEL;
+					return;
 			} else if (cmdp->param.cont) {
 				if (cmdp->param.cont( ctx, cmdp, cmd ))
-					return RESP_CANCEL;
+					return;
 			} else {
 				error( "IMAP error: unexpected command continuation request\n" );
 				break;
 			}
 			if (socket_write( &ctx->conn, "\r\n", 2, KeepOwn ) < 0)
-				return RESP_CANCEL;
+				return;
 			if (!cmdp->param.cont)
 				ctx->literal_pending = 0;
-			if (!tcmd)
-				return RESP_OK;
 		} else {
 			tag = atoi( arg );
 			for (pcmdp = &ctx->in_progress; (cmdp = *pcmdp); pcmdp = &cmdp->next)
@@ -910,7 +926,7 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
 			arg = next_arg( &cmd );
 			if (!strcmp( "OK", arg )) {
 				if (cmdp->param.to_trash)
-					ctx->trashnc = 0; /* Can't get NO [TRYCREATE] any more. */
+					ctx->trashnc = TrashKnown; /* Can't get NO [TRYCREATE] any more. */
 				resp = RESP_OK;
 			} else {
 				if (!strcmp( "NO", arg )) {
@@ -921,10 +937,11 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
 						struct imap_cmd_trycreate *cmd2 =
 							(struct imap_cmd_trycreate *)new_imap_cmd( sizeof(*cmd2) );
 						cmd2->orig_cmd = cmdp;
-						cmd2->gen.param.done = get_cmd_result_p2;
+						cmd2->gen.param.high_prio = 1;
 						p = strchr( cmdp->cmd, '"' );
-						if (!submit_imap_cmd( ctx, &cmd2->gen, "CREATE %.*s", strchr( p + 1, '"' ) - p + 1, p ))
-							return RESP_CANCEL;
+						if (imap_exec( ctx, &cmd2->gen, get_cmd_result_p2,
+						               "CREATE %.*s", strchr( p + 1, '"' ) - p + 1, p ) < 0)
+							return;
 						continue;
 					}
 					resp = RESP_NO;
@@ -941,13 +958,17 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
 				imap_invoke_bad_callback( ctx );
 			done_imap_cmd( ctx, cmdp, resp );
 			if (imap_deref( ctx ))
-				resp = RESP_CANCEL;
-			if (resp == RESP_CANCEL || !tcmd || tcmd == cmdp)
-				return resp;
+				return;
+			if (ctx->canceling && !ctx->in_progress) {
+				ctx->canceling = 0;
+				ctx->callbacks.imap_cancel( ctx->callback_aux );
+				return;
+			}
 		}
+		if (flush_imap_cmds( ctx ) < 0)
+			return;
 	}
 	imap_invoke_bad_callback( ctx );
-	return RESP_CANCEL;
 }
 
 static void
@@ -960,8 +981,11 @@ get_cmd_result_p2( imap_store_t *ctx, struct imap_cmd *cmd, int response )
 		done_imap_cmd( ctx, ocmd, response );
 	} else {
 		ctx->uidnext = 0;
+		if (ocmd->param.to_trash)
+			ctx->trashnc = TrashKnown;
 		ocmd->param.create = 0;
-		submit_imap_cmd( ctx, ocmd, 0 );
+		ocmd->param.high_prio = 1;
+		submit_imap_cmd( ctx, ocmd );
 	}
 }
 
@@ -974,6 +998,7 @@ imap_cancel_store( store_t *gctx )
 
 	socket_close( &ctx->conn );
 	cancel_submitted_imap_cmds( ctx );
+	cancel_pending_imap_cmds( ctx );
 	free_generic_messages( ctx->gen.msgs );
 	free_string_list( ctx->gen.boxes );
 	free_list( ctx->ns_personal );
@@ -1082,10 +1107,15 @@ do_cram_auth( imap_store_t *ctx, struct imap_cmd *cmdp, const char *prompt )
 }
 #endif
 
+static void imap_open_store_connected( int, void * );
+#ifdef HAVE_LIBSSL
+static void imap_open_store_tlsstarted1( int, void * );
+#endif
 static void imap_open_store_p2( imap_store_t *, struct imap_cmd *, int );
 static void imap_open_store_authenticate( imap_store_t * );
 #ifdef HAVE_LIBSSL
 static void imap_open_store_authenticate_p2( imap_store_t *, struct imap_cmd *, int );
+static void imap_open_store_tlsstarted2( int, void * );
 static void imap_open_store_authenticate_p3( imap_store_t *, struct imap_cmd *, int );
 #endif
 static void imap_open_store_authenticate2( imap_store_t * );
@@ -1131,26 +1161,41 @@ imap_open_store( store_conf_t *conf,
 	ctx->callback_aux = aux;
 	set_bad_callback( &ctx->gen, (void (*)(void *))imap_open_store_bail, ctx );
 	ctx->in_progress_append = &ctx->in_progress;
+	ctx->pending_append = &ctx->pending;
 
-	socket_init( &ctx->conn, (void (*)( void * ))imap_invoke_bad_callback, ctx );
+	socket_init( &ctx->conn, &srvc->sconf,
+	             (void (*)( void * ))imap_invoke_bad_callback,
+	             imap_socket_read, (int (*)(void *))flush_imap_cmds, ctx );
+	socket_connect( &ctx->conn, imap_open_store_connected );
+}
 
-	if (!socket_connect( &srvc->sconf, &ctx->conn ))
-		goto bail;
+static void
+imap_open_store_connected( int ok, void *aux )
+{
+	imap_store_t *ctx = (imap_store_t *)aux;
+#ifdef HAVE_LIBSSL
+	imap_store_conf_t *cfg = (imap_store_conf_t *)ctx->gen.conf;
+	imap_server_conf_t *srvc = cfg->server;
+#endif
 
+	if (!ok)
+		imap_open_store_bail( ctx );
 #ifdef HAVE_LIBSSL
-	if (srvc->sconf.use_imaps) {
-		if (socket_start_tls( &srvc->sconf, &ctx->conn )) {
-			imap_open_store_ssl_bail( ctx );
-			return;
-		}
-	}
+	else if (srvc->sconf.use_imaps)
+		socket_start_tls( &ctx->conn, imap_open_store_tlsstarted1 );
 #endif
-	get_cmd_result( ctx, 0 );
-	return;
+}
 
-  bail:
-	imap_open_store_bail( ctx );
+#ifdef HAVE_LIBSSL
+static void
+imap_open_store_tlsstarted1( int ok, void *aux )
+{
+	imap_store_t *ctx = (imap_store_t *)aux;
+
+	if (!ok)
+		imap_open_store_ssl_bail( ctx );
 }
+#endif
 
 static void
 imap_open_store_greeted( imap_store_t *ctx )
@@ -1213,7 +1258,16 @@ imap_open_store_authenticate_p2( imap_store_t *ctx, struct imap_cmd *cmd ATTR_UN
 {
 	if (response != RESP_OK)
 		imap_open_store_bail( ctx );
-	else if (socket_start_tls( &((imap_server_conf_t *)ctx->gen.conf)->sconf, &ctx->conn ))
+	else
+		socket_start_tls( &ctx->conn, imap_open_store_tlsstarted2 );
+}
+
+static void
+imap_open_store_tlsstarted2( int ok, void *aux )
+{
+	imap_store_t *ctx = (imap_store_t *)aux;
+
+	if (!ok)
 		imap_open_store_ssl_bail( ctx );
 	else
 		imap_exec( ctx, 0, imap_open_store_authenticate_p3, "CAPABILITY" );
@@ -1343,7 +1397,7 @@ static void
 imap_open_store_finalize( imap_store_t *ctx )
 {
 	set_bad_callback( &ctx->gen, 0, 0 );
-	ctx->trashnc = 1;
+	ctx->trashnc = TrashUnknown;
 	ctx->callbacks.imap_open( &ctx->gen, ctx->callback_aux );
 }
 
@@ -1404,8 +1458,7 @@ imap_select( store_t *gctx, int create,
 
 /******************* imap_load *******************/
 
-static int imap_submit_load( imap_store_t *, const char *, struct imap_cmd_refcounted_state *,
-                             struct imap_cmd ** );
+static int imap_submit_load( imap_store_t *, const char *, struct imap_cmd_refcounted_state * );
 static void imap_load_p2( imap_store_t *, struct imap_cmd *, int );
 
 static void
@@ -1420,7 +1473,6 @@ imap_load( store_t *gctx, int minuid, int maxuid, int *excs, int nexcs,
 		free( excs );
 		cb( DRV_OK, aux );
 	} else {
-		struct imap_cmd *cmd2 = 0;
 		struct imap_cmd_refcounted_state *sts = imap_refcounted_new_state( cb, aux );
 
 		ctx->msgapp = &ctx->gen.msgs;
@@ -1435,35 +1487,29 @@ imap_load( store_t *gctx, int minuid, int maxuid, int *excs, int nexcs,
 				if (i != j)
 					bl += sprintf( buf + bl, ":%d", excs[i] );
 			}
-			if (imap_submit_load( ctx, buf, sts, &cmd2 ) < 0)
+			if (imap_submit_load( ctx, buf, sts ) < 0)
 				goto done;
 		}
 		if (maxuid == INT_MAX)
 			maxuid = ctx->uidnext >= 0 ? ctx->uidnext - 1 : 1000000000;
 		if (maxuid >= minuid) {
 			sprintf( buf, "%d:%d", minuid, maxuid );
-			imap_submit_load( ctx, buf, sts, &cmd2 );
+			imap_submit_load( ctx, buf, sts );
 		}
 	  done:
 		free( excs );
 		if (!--sts->ref_count)
 			imap_refcounted_done( sts );
-		else
-			get_cmd_result( ctx, cmd2 );
 	}
 }
 
 static int
-imap_submit_load( imap_store_t *ctx, const char *buf, struct imap_cmd_refcounted_state *sts,
-                  struct imap_cmd **cmdp )
+imap_submit_load( imap_store_t *ctx, const char *buf, struct imap_cmd_refcounted_state *sts )
 {
-	struct imap_cmd *cmd = imap_refcounted_new_cmd( sts );
-	cmd->param.done = imap_load_p2;
-	*cmdp = cmd;
-	return submit_imap_cmd( ctx, cmd,
-	                        "UID FETCH %s (UID%s%s)", buf,
-	                        (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
-	                        (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "" ) ? 0 : -1;
+	return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_load_p2,
+	                  "UID FETCH %s (UID%s%s)", buf,
+	                  (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
+	                  (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "" );
 }
 
 static void
@@ -1528,12 +1574,9 @@ imap_flags_helper( imap_store_t *ctx, int uid, char what, int flags,
 {
 	char buf[256];
 
-	struct imap_cmd *cmd = imap_refcounted_new_cmd( sts );
-	cmd->param.done = imap_set_flags_p2;
 	buf[imap_make_flags( flags, buf )] = 0;
-	if (!submit_imap_cmd( ctx, cmd, "UID STORE %d %cFLAGS.SILENT %s", uid, what, buf ))
-		return -1;
-	return process_imap_replies( ctx ) == RESP_CANCEL ? -1 : 0;
+	return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_set_flags_p2,
+	                  "UID STORE %d %cFLAGS.SILENT %s", uid, what, buf );
 }
 
 static void
@@ -1705,8 +1748,16 @@ static void
 imap_cancel( store_t *gctx,
              void (*cb)( void *aux ), void *aux )
 {
-	(void)gctx;
-	cb( aux );
+	imap_store_t *ctx = (imap_store_t *)gctx;
+
+	cancel_pending_imap_cmds( ctx );
+	if (ctx->in_progress) {
+		ctx->canceling = 1;
+		ctx->callbacks.imap_cancel = cb;
+		ctx->callback_aux = aux;
+	} else {
+		cb( aux );
+	}
 }
 
 /******************* imap_commit *******************/
@@ -1753,7 +1804,7 @@ imap_parse_store( conffile_t *cfg, store_conf_t **storep, int *err )
 	server->require_ssl = 1;
 	server->sconf.use_tlsv1 = 1;
 #endif
-	server->max_in_progress = 50;
+	server->max_in_progress = INT_MAX;
 
 	while (getcline( cfg ) && cfg->cmd) {
 		if (!strcasecmp( "Host", cfg->cmd )) {

+ 30 - 4
src/isync.h

@@ -73,15 +73,36 @@ typedef struct server_conf {
 #endif
 } server_conf_t;
 
+typedef struct buff_chunk {
+	struct buff_chunk *next;
+	char *data;
+	int len;
+	char buf[1];
+} buff_chunk_t;
+
 typedef struct {
+	/* connection */
 	int fd;
+	int state;
+	const server_conf_t *conf; /* needed during connect */
 #ifdef HAVE_LIBSSL
 	SSL *ssl;
 #endif
 
 	void (*bad_callback)( void *aux ); /* async fail while sending or listening */
+	void (*read_callback)( void *aux ); /* data available for reading */
+	int (*write_callback)( void *aux ); /* all *queued* data was sent */
+	union {
+		void (*connect)( int ok, void *aux );
+		void (*starttls)( int ok, void *aux );
+	} callbacks;
 	void *callback_aux;
 
+	/* writing */
+	buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */
+	int write_offset; /* offset into buffer head */
+
+	/* reading */
 	int offset; /* start of filled bytes in buffer */
 	int bytes; /* number of filled bytes in buffer */
 	int scanoff; /* offset to continue scanning for newline at, relative to 'offset' */
@@ -335,22 +356,27 @@ extern const char *Home;
 
 /* call this before doing anything with the socket */
 static INLINE void socket_init( conn_t *conn,
+                                const server_conf_t *conf,
                                 void (*bad_callback)( void *aux ),
+                                void (*read_callback)( void *aux ),
+                                int (*write_callback)( void *aux ),
                                 void *aux )
 {
+	conn->conf = conf;
 	conn->bad_callback = bad_callback;
+	conn->read_callback = read_callback;
+	conn->write_callback = write_callback;
 	conn->callback_aux = aux;
 	conn->fd = -1;
+	conn->write_buf_append = &conn->write_buf;
 }
-int socket_connect( const server_conf_t *conf, conn_t *sock );
-int socket_start_tls( const server_conf_t *conf, conn_t *sock );
+void socket_connect( conn_t *conn, void (*cb)( int ok, void *aux ) );
+void socket_start_tls(conn_t *conn, void (*cb)( int ok, void *aux ) );
 void socket_close( conn_t *sock );
-int socket_fill( conn_t *sock );
 int socket_read( conn_t *sock, char *buf, int len ); /* never waits */
 char *socket_read_line( conn_t *sock ); /* don't free return value; never waits */
 typedef enum { KeepOwn = 0, GiveOwn } ownership_t;
 int socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn );
-int socket_pending( conn_t *sock );
 
 void cram( const char *challenge, const char *user, const char *pass,
            char **_final, int *_finallen );

+ 1 - 3
src/mbsync.1

@@ -281,10 +281,8 @@ Use TLSv1 for communication with the IMAP server over SSL?
 \fBPipelineDepth\fR \fIdepth\fR
 Maximum number of IMAP commands which can be simultaneously in flight.
 Setting this to \fI1\fR disables pipelining.
-Setting it to a too big value may deadlock isync.
-Currently, this affects only a few commands.
 This is mostly a debugging only option.
-(Default: \fI50\fR)
+(Default: \fIunlimited\fR)
 ..
 .SS IMAP Stores
 The reference point for relative \fBPath\fRs is whatever the server likes it

+ 304 - 88
src/socket.c

@@ -36,56 +36,67 @@
 #include <assert.h>
 #include <unistd.h>
 #include <stdlib.h>
+#include <stddef.h>
 #include <errno.h>
 #include <string.h>
+#include <fcntl.h>
 #include <sys/socket.h>
 #include <sys/ioctl.h>
-#ifdef HAVE_SYS_FILIO_H
-# include <sys/filio.h>
-#endif
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <arpa/inet.h>
 #include <netdb.h>
 
+enum {
+	SCK_CONNECTING,
+#ifdef HAVE_LIBSSL
+	SCK_STARTTLS,
+#endif
+	SCK_READY
+};
+
 static void
 socket_fail( conn_t *conn )
 {
 	conn->bad_callback( conn->callback_aux );
 }
 
-static void
-socket_perror( const char *func, conn_t *sock, int ret )
-{
 #ifdef HAVE_LIBSSL
+static int
+ssl_return( const char *func, conn_t *conn, int ret )
+{
 	int err;
 
-	if (sock->ssl) {
-		switch ((err = SSL_get_error( sock->ssl, ret ))) {
-		case SSL_ERROR_SYSCALL:
-		case SSL_ERROR_SSL:
-			if ((err = ERR_get_error()) == 0) {
-				if (ret == 0)
-					error( "SSL_%s: got EOF\n", func );
-				else
-					error( "SSL_%s: %s\n", func, strerror(errno) );
-			} else
-				error( "SSL_%s: %s\n", func, ERR_error_string( err, 0 ) );
-			break;
-		default:
-			error( "SSL_%s: unhandled SSL error %d\n", func, err );
-			break;
+	switch ((err = SSL_get_error( conn->ssl, ret ))) {
+	case SSL_ERROR_NONE:
+		return ret;
+	case SSL_ERROR_WANT_WRITE:
+		conf_fd( conn->fd, POLLIN, POLLOUT );
+		/* fallthrough */
+	case SSL_ERROR_WANT_READ:
+		return 0;
+	case SSL_ERROR_SYSCALL:
+	case SSL_ERROR_SSL:
+		if (!(err = ERR_get_error())) {
+			if (ret == 0)
+				error( "SSL_%s: unexpected EOF\n", func );
+			else
+				error( "SSL_%s: %s\n", func, strerror( errno ) );
+		} else {
+			error( "SSL_%s: %s\n", func, ERR_error_string( err, 0 ) );
 		}
-	} else
-#endif
-	if (ret < 0)
-		perror( func );
+		break;
+	default:
+		error( "SSL_%s: unhandled SSL error %d\n", func, err );
+		break;
+	}
+	if (conn->state == SCK_STARTTLS)
+		conn->callbacks.starttls( 0, conn->callback_aux );
 	else
-		error( "%s: unexpected EOF\n", func );
-	socket_fail( sock );
+		socket_fail( conn );
+	return -1;
 }
 
-#ifdef HAVE_LIBSSL
 /* Some of this code is inspired by / lifted from mutt. */
 
 static int
@@ -245,45 +256,85 @@ init_ssl_ctx( const server_conf_t *conf )
 	return 0;
 }
 
-int
-socket_start_tls( const server_conf_t *conf, conn_t *sock )
+static void start_tls_p2( conn_t * );
+static void start_tls_p3( conn_t *, int );
+
+void
+socket_start_tls( conn_t *conn, void (*cb)( int ok, void *aux ) )
 {
-	int ret;
 	static int ssl_inited;
 
+	conn->callbacks.starttls = cb;
+
 	if (!ssl_inited) {
 		SSL_library_init();
 		SSL_load_error_strings();
 		ssl_inited = 1;
 	}
 
-	if (!conf->SSLContext && init_ssl_ctx( conf ))
-		return 1;
-
-	sock->ssl = SSL_new( ((server_conf_t *)conf)->SSLContext );
-	SSL_set_fd( sock->ssl, sock->fd );
-	if ((ret = SSL_connect( sock->ssl )) <= 0) {
-		socket_perror( "connect", sock, ret );
-		return 1;
+	if (!conn->conf->SSLContext && init_ssl_ctx( conn->conf )) {
+		start_tls_p3( conn, 0 );
+		return;
 	}
 
-	/* verify the server certificate */
-	if (verify_cert( conf, sock ))
-		return 1;
+	conn->ssl = SSL_new( ((server_conf_t *)conn->conf)->SSLContext );
+	SSL_set_fd( conn->ssl, conn->fd );
+	SSL_set_mode( conn->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER );
+	start_tls_p2( conn );
+}
 
-	info( "Connection is now encrypted\n" );
-	return 0;
+static void
+start_tls_p2( conn_t *conn )
+{
+	switch (ssl_return( "connect", conn, SSL_connect( conn->ssl ) )) {
+	case -1:
+		start_tls_p3( conn, 0 );
+		break;
+	case 0:
+		break;
+	default:
+		/* verify the server certificate */
+		if (verify_cert( conn->conf, conn )) {
+			start_tls_p3( conn, 0 );
+		} else {
+			info( "Connection is now encrypted\n" );
+			start_tls_p3( conn, 1 );
+		}
+		break;
+	}
+}
+
+static void start_tls_p3( conn_t *conn, int ok )
+{
+	conn->state = SCK_READY;
+	conn->callbacks.starttls( ok, conn->callback_aux );
 }
 
 #endif /* HAVE_LIBSSL */
 
-int
-socket_connect( const server_conf_t *conf, conn_t *sock )
+static void socket_fd_cb( int, void * );
+
+static void socket_connected2( conn_t * );
+static void socket_connect_bail( conn_t * );
+
+static void
+socket_close_internal( conn_t *sock )
+{
+	del_fd( sock->fd );
+	close( sock->fd );
+	sock->fd = -1;
+}
+
+void
+socket_connect( conn_t *sock, void (*cb)( int ok, void *aux ) )
 {
+	const server_conf_t *conf = sock->conf;
 	struct hostent *he;
 	struct sockaddr_in addr;
 	int s, a[2];
 
+	sock->callbacks.connect = cb;
+
 	/* open connection to IMAP server */
 	if (conf->tunnel) {
 		infon( "Starting tunnel '%s'... ", conf->tunnel );
@@ -304,6 +355,10 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
 
 		close( a[0] );
 		sock->fd = a[1];
+
+		fcntl( a[1], F_SETFL, O_NONBLOCK );
+		add_fd( a[1], socket_fd_cb, sock );
+
 	} else {
 		memset( &addr, 0, sizeof(addr) );
 		addr.sin_port = conf->port ? htons( conf->port ) :
@@ -317,7 +372,7 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
 		he = gethostbyname( conf->host );
 		if (!he) {
 			error( "IMAP error: Cannot resolve server '%s'\n", conf->host );
-			return -1;
+			goto bail;
 		}
 		info( "ok\n" );
 
@@ -328,36 +383,87 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
 			perror( "socket" );
 			exit( 1 );
 		}
+		sock->fd = s;
+		fcntl( s, F_SETFL, O_NONBLOCK );
+		add_fd( s, socket_fd_cb, sock );
 
-		infon( "Connecting to %s:%hu... ", inet_ntoa( addr.sin_addr ), ntohs( addr.sin_port ) );
+		infon( "Connecting to %s (%s:%hu) ... ",
+		       conf->host, inet_ntoa( addr.sin_addr ), ntohs( addr.sin_port ) );
 		if (connect( s, (struct sockaddr *)&addr, sizeof(addr) )) {
-			close( s );
-			perror( "connect" );
-			return -1;
+			if (errno != EINPROGRESS) {
+				perror( "connect" );
+				socket_close_internal( sock );
+				goto bail;
+			}
+			conf_fd( s, 0, POLLOUT );
+			sock->state = SCK_CONNECTING;
+			info( "\n" );
+			return;
 		}
 
-		sock->fd = s;
 	}
 	info( "ok\n" );
-	return 0;
+	socket_connected2( sock );
+	return;
+
+  bail:
+	socket_connect_bail( sock );
+}
+
+static void
+socket_connected( conn_t *conn )
+{
+	int soerr;
+	socklen_t selen = sizeof(soerr);
+
+	infon( "Connecting to %s: ", conn->conf->host );
+	if (getsockopt( conn->fd, SOL_SOCKET, SO_ERROR, &soerr, &selen )) {
+		perror( "getsockopt" );
+		exit( 1 );
+	}
+	if (soerr) {
+		errno = soerr;
+		perror( "connect" );
+		socket_close_internal( conn );
+		socket_connect_bail( conn );
+		return;
+	}
+	info( "ok\n" );
+	socket_connected2( conn );
+}
+
+static void
+socket_connected2( conn_t *conn )
+{
+	conf_fd( conn->fd, 0, POLLIN );
+	conn->state = SCK_READY;
+	conn->callbacks.connect( 1, conn->callback_aux );
 }
 
+static void
+socket_connect_bail( conn_t *conn )
+{
+	conn->callbacks.connect( 0, conn->callback_aux );
+}
+
+static void dispose_chunk( conn_t *conn );
+
 void
 socket_close( conn_t *sock )
 {
-	if (sock->fd >= 0) {
-		close( sock->fd );
-		sock->fd = -1;
-	}
+	if (sock->fd >= 0)
+		socket_close_internal( sock );
 #ifdef HAVE_LIBSSL
 	if (sock->ssl) {
 		SSL_free( sock->ssl );
 		sock->ssl = 0;
 	}
 #endif
+	while (sock->write_buf)
+		dispose_chunk( sock );
 }
 
-int
+static void
 socket_fill( conn_t *sock )
 {
 	char *buf;
@@ -366,22 +472,31 @@ socket_fill( conn_t *sock )
 	if (!len) {
 		error( "Socket error: receive buffer full. Probably protocol error.\n" );
 		socket_fail( sock );
-		return -1;
+		return;
 	}
 	assert( sock->fd >= 0 );
 	buf = sock->buf + n;
-	n =
 #ifdef HAVE_LIBSSL
-		sock->ssl ? SSL_read( sock->ssl, buf, len ) :
+	if (sock->ssl) {
+		if ((n = ssl_return( "read", sock, SSL_read( sock->ssl, buf, len ) )) <= 0)
+			return;
+		if (n == len && SSL_pending( sock->ssl ))
+			fake_fd( sock->fd, POLLIN );
+	} else
 #endif
-		read( sock->fd, buf, len );
-	if (n <= 0) {
-		socket_perror( "read", sock, n );
-		return -1;
-	} else {
-		sock->bytes += n;
-		return 0;
+	{
+		if ((n = read( sock->fd, buf, len )) < 0) {
+			perror( "read" );
+			socket_fail( sock );
+			return;
+		} else if (!n) {
+			error( "read: unexpected EOF\n" );
+			socket_fail( sock );
+			return;
+		}
 	}
+	sock->bytes += n;
+	sock->read_callback( sock->callback_aux );
 }
 
 int
@@ -426,40 +541,141 @@ socket_read_line( conn_t *b )
 	return s;
 }
 
-int
-socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn )
+static int
+do_write( conn_t *sock, char *buf, int len )
 {
 	int n;
 
 	assert( sock->fd >= 0 );
-	n =
 #ifdef HAVE_LIBSSL
-		sock->ssl ? SSL_write( sock->ssl, buf, len ) :
+	if (sock->ssl)
+		return ssl_return( "write", sock, SSL_write( sock->ssl, buf, len ) );
 #endif
-		write( sock->fd, buf, len );
-	if (takeOwn == GiveOwn)
-		free( buf );
-	if (n != len) {
-		socket_perror( "write", sock, n );
-		return -1;
+	n = write( sock->fd, buf, len );
+	if (n < 0) {
+		if (errno != EAGAIN && errno != EWOULDBLOCK) {
+			perror( "write" );
+			socket_fail( sock );
+		} else {
+			n = 0;
+			conf_fd( sock->fd, POLLIN, POLLOUT );
+		}
+	} else if (n != len) {
+		conf_fd( sock->fd, POLLIN, POLLOUT );
 	}
-	return 0;
+	return n;
+}
+
+static void
+dispose_chunk( conn_t *conn )
+{
+	buff_chunk_t *bc = conn->write_buf;
+	if (!(conn->write_buf = bc->next))
+		conn->write_buf_append = &conn->write_buf;
+	if (bc->data != bc->buf)
+		free( bc->data );
+	free( bc );
+}
+
+static int
+do_queued_write( conn_t *conn )
+{
+	buff_chunk_t *bc;
+
+	if (!conn->write_buf)
+		return 0;
+
+	while ((bc = conn->write_buf)) {
+		int n, len = bc->len - conn->write_offset;
+		if ((n = do_write( conn, bc->data + conn->write_offset, len )) < 0)
+			return -1;
+		if (n != len) {
+			conn->write_offset += n;
+			return 0;
+		}
+		conn->write_offset = 0;
+		dispose_chunk( conn );
+	}
+#ifdef HAVE_LIBSSL
+	if (conn->ssl && SSL_pending( conn->ssl ))
+		fake_fd( conn->fd, POLLIN );
+#endif
+	return conn->write_callback( conn->callback_aux );
+}
+
+static void
+do_append( conn_t *conn, char *buf, int len, ownership_t takeOwn )
+{
+	buff_chunk_t *bc;
+
+	if (takeOwn == GiveOwn) {
+		bc = nfmalloc( offsetof(buff_chunk_t, buf) );
+		bc->data = buf;
+	} else {
+		bc = nfmalloc( offsetof(buff_chunk_t, buf) + len );
+		bc->data = bc->buf;
+		memcpy( bc->data, buf, len );
+	}
+	bc->len = len;
+	bc->next = 0;
+	*conn->write_buf_append = bc;
+	conn->write_buf_append = &bc->next;
 }
 
 int
-socket_pending( conn_t *sock )
+socket_write( conn_t *conn, char *buf, int len, ownership_t takeOwn )
+{
+	if (conn->write_buf) {
+		do_append( conn, buf, len, takeOwn );
+		return len;
+	} else {
+		int n = do_write( conn, buf, len );
+		if (n != len && n >= 0) {
+			conn->write_offset = n;
+			do_append( conn, buf, len, takeOwn );
+		} else if (takeOwn) {
+			free( buf );
+		}
+		return n;
+	}
+}
+
+static void
+socket_fd_cb( int events, void *aux )
 {
-	int num = -1;
+	conn_t *conn = (conn_t *)aux;
+
+	if (events & POLLERR) {
+		error( "Unidentified socket error.\n" );
+		socket_fail( conn );
+		return;
+	}
+
+	if (conn->state == SCK_CONNECTING) {
+		socket_connected( conn );
+		return;
+	}
+
+	if (events & POLLOUT)
+		conf_fd( conn->fd, POLLIN, 0 );
 
-	if (ioctl( sock->fd, FIONREAD, &num ) < 0)
-		return -1;
-	if (num > 0)
-		return num;
 #ifdef HAVE_LIBSSL
-	if (sock->ssl)
-		return SSL_pending( sock->ssl );
+	if (conn->state == SCK_STARTTLS) {
+		start_tls_p2( conn );
+		return;
+	}
+	if (conn->ssl) {
+		if (do_queued_write( conn ) < 0)
+			return;
+		socket_fill( conn );
+		return;
+	}
 #endif
-	return 0;
+
+	if ((events & POLLOUT) && do_queued_write( conn ) < 0)
+		return;
+	if (events & POLLIN)
+		socket_fill( conn );
 }
 
 #ifdef HAVE_LIBSSL