File gvfs-dav-recursive-directory-ops.patch of Package gvfs

commit 25cb6b442cc37b8e6056cc837233523d7572736b
Author: Hans Petter Jansson <hpj@cl.no>
Date:   Thu Jan 13 18:04:58 2011 +0100

    Patch 8: gvfs-dav-recursive-directory-ops.patch

diff --git a/daemon/Makefile.am b/daemon/Makefile.am
index 39369b4..767eeb6 100644
--- a/daemon/Makefile.am
+++ b/daemon/Makefile.am
@@ -430,6 +430,7 @@ gvfsd_gphoto2_LDADD = $(libraries) $(GPHOTO2_LIBS) $(HAL_LIBS)
 endif
 
 gvfsd_http_SOURCES = \
+	gmempipe.c gmempipe.h \
 	soup-input-stream.c soup-input-stream.h \
 	soup-output-stream.c soup-output-stream.h \
 	gvfsbackendhttp.c gvfsbackendhttp.h \
@@ -474,6 +475,7 @@ gvfsd_nvvfs_LDADD = $(libraries)
 
 
 gvfsd_dav_SOURCES = \
+	gmempipe.c gmempipe.h \
 	soup-input-stream.c soup-input-stream.h \
 	soup-output-stream.c soup-output-stream.h \
 	gvfsbackendhttp.c gvfsbackendhttp.h \
diff --git a/daemon/gmempipe.c b/daemon/gmempipe.c
new file mode 100644
index 0000000..3ceebd9
--- /dev/null
+++ b/daemon/gmempipe.c
@@ -0,0 +1,716 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) Christian Kellner <gicmo@gnome.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ * Author: Christian Kellner <gicmo@gnome.org>
+ */
+
+#include "gmempipe.h"
+
+#include <string.h>
+
+
+
+G_DEFINE_TYPE (GMemPipe, g_mem_pipe, G_TYPE_OBJECT)
+
+/* TODO: Real P_() */
+#define P_(_x) (_x)
+
+static void   g_mem_pipe_get_property     (GObject    *object,
+                                           guint       prop_id,
+                                           GValue     *value,
+                                           GParamSpec *pspec);
+
+static void   g_mem_pipe_set_property     (GObject      *object,
+                                           guint         prop_id,
+                                           const GValue *value,
+                                           GParamSpec   *pspec);
+
+typedef enum {
+
+  GPD_NONE  = 0x00,
+  GPD_READ  = 0x01, /* reading from the pipe  */
+  GPD_WRITE = 0x02  /* writing to the pipe */
+
+} GPipeDirection;
+
+enum
+{
+  PROP_0,
+  PROP_BUFFER_SIZE
+};
+
+struct _GMemPipePrivate
+{
+
+  struct {
+    gsize  size;
+    gsize  dlen;
+
+    char   *data;
+
+  } buffer;
+
+  GMutex *lock;
+  GCond  *cond;
+
+  GPipeDirection  closed;
+  GInputStream   *input_stream;
+  GOutputStream  *output_stream;
+};
+
+/* streaming classes declarations */
+typedef struct _GMemPipeInputStream GMemPipeInputStream;
+typedef struct _GMemPipeOutputStream GMemPipeOutputStream;
+
+static GOutputStream *g_mem_pipe_output_stream_new (GMemPipe *mem_pipe);
+static GInputStream  *g_mem_pipe_input_stream_new  (GMemPipe *mem_pipe);
+
+/* GMemPipe imlementation starts here*/
+static void
+g_mem_pipe_finalize (GObject *object)
+{
+  GMemPipe        *mem_pipe;
+  GMemPipePrivate *priv;
+
+  mem_pipe = G_MEM_PIPE (object);
+
+  priv = mem_pipe->priv;
+
+  g_mutex_free (priv->lock);
+  g_cond_free (priv->cond);
+
+  if (priv->buffer.size)
+    g_free (priv->buffer.data);
+
+  /* FIXME: move those to dispose? */
+  g_object_unref (priv->input_stream);
+  g_object_unref (priv->output_stream);
+
+  if (G_OBJECT_CLASS (g_mem_pipe_parent_class)->finalize)
+    (*G_OBJECT_CLASS (g_mem_pipe_parent_class)->finalize) (object);
+}
+
+static void
+g_mem_pipe_class_init (GMemPipeClass *klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+
+  g_type_class_add_private (klass, sizeof (GMemPipePrivate));
+
+  gobject_class->finalize     = g_mem_pipe_finalize;
+  gobject_class->set_property = g_mem_pipe_set_property;
+  gobject_class->get_property = g_mem_pipe_get_property;
+  
+  g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
+                                   g_param_spec_uint ("buffer-size",
+                                                      P_("Buffer size"),
+                                                      P_("The size of the internal buffer"),
+                                                      0,
+                                                      G_MAXUINT,
+                                                      0,
+                                                      G_PARAM_CONSTRUCT_ONLY |
+                                                      G_PARAM_READWRITE |
+                                                      G_PARAM_STATIC_STRINGS));
+  
+}
+
+static void
+g_mem_pipe_init (GMemPipe *mem_pipe)
+{
+  GMemPipePrivate *priv;
+  mem_pipe->priv = priv = G_TYPE_INSTANCE_GET_PRIVATE (mem_pipe,
+						       G_TYPE_MEM_PIPE,
+						       GMemPipePrivate);
+
+  priv->cond = g_cond_new ();
+  priv->lock = g_mutex_new ();
+  priv->input_stream  = g_mem_pipe_input_stream_new (mem_pipe);
+  priv->output_stream = g_mem_pipe_output_stream_new (mem_pipe);
+}
+
+static void
+g_mem_pipe_get_property (GObject    *object,
+                         guint       prop_id,
+                         GValue     *value,
+                         GParamSpec *pspec)
+{
+  GMemPipe *mem_pipe = G_MEM_PIPE (object);
+
+  switch (prop_id)
+    {
+      case PROP_BUFFER_SIZE:
+	g_warn_if_fail (mem_pipe->priv->buffer.dlen == 0);
+        g_value_set_uint (value, mem_pipe->priv->buffer.size);
+        break;
+    
+      default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+  }
+}
+
+static void
+g_mem_pipe_set_property (GObject      *object,
+                         guint         prop_id,
+                         const GValue *value,
+                         GParamSpec   *pspec)
+{
+  GMemPipe *mem_pipe = G_MEM_PIPE (object);
+  
+  switch (prop_id)
+    {
+      case PROP_BUFFER_SIZE:
+        mem_pipe->priv->buffer.size = g_value_get_uint (value);
+        mem_pipe->priv->buffer.data = g_realloc (mem_pipe->priv->buffer.data,
+                                                 mem_pipe->priv->buffer.size);
+        break;
+      
+      default:
+        G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+    } 
+}
+
+static void
+signal_cancellation (GCancellable *cancellable,
+                     gpointer user_data)
+{
+  GMemPipe        *mem_pipe;
+  GMemPipePrivate *priv;
+
+  mem_pipe = G_MEM_PIPE (user_data);
+  priv = mem_pipe->priv;
+
+  g_mutex_lock (priv->lock);
+  g_cond_signal (priv->cond);
+  g_mutex_unlock (priv->lock);
+}
+
+static guint
+_g_mem_pipe_cancellable_connect (GMemPipe     *mem_pipe,
+                                 GCancellable *cancellable)
+{
+  guint handler_id;
+
+  if (cancellable == NULL)
+    return 0;
+
+  handler_id = g_cancellable_connect (cancellable,
+                                      G_CALLBACK (signal_cancellation),
+                                      mem_pipe,
+                                      NULL);
+  
+  return handler_id;
+}
+
+/* half open means write-end closed and read-end still open;
+ * NB there is no need to special case the inverse of it
+ * since writing to a pipe where the read is already closed
+ * makes no sense at all (this is a SIGPIPE in unix land)
+ */
+#define _g_mem_pipe_is_half_open(_pipe)  _pipe->priv->closed & GPD_WRITE && \
+                                        ~_pipe->priv->closed & GPD_READ
+static gboolean
+_g_mem_pipe_check_is_open (GMemPipe       *mem_pipe,
+                           GPipeDirection  direction,
+                           GError         **error)
+{
+  gboolean res;
+  GMemPipePrivate *priv = mem_pipe->priv;
+
+  res = priv->closed == 0;
+
+  if (res == FALSE && direction & GPD_READ)
+    res = _g_mem_pipe_is_half_open (mem_pipe);
+
+  if (res == FALSE)
+    g_set_error_literal (error, 
+			 G_IO_ERROR,
+			 G_IO_ERROR_CLOSED,
+			 "Pipe closed");
+  return res;
+}
+
+/* LOCKED: Must be called with the LOCK held! 
+ *
+ * Checks if the pipe is ready for IO. Will wait
+ * if not.
+ * 
+ * NB: Can only check for GPD_WIRTE *or* GPD_READ!
+ *
+ * Returns: The number of data one can read/write
+ * to the pipe; -1 in case of an error and 0 iff
+ * the pipe write end got closed and there is no
+ * data left in the pipe to read from.
+ */
+static gssize
+_g_mem_pipe_poll (GMemPipe        *mem_pipe,
+                  GPipeDirection   direction,
+                  GCancellable    *cancellable,
+                  GError         **error)
+{
+  GMemPipePrivate *priv = mem_pipe->priv;
+  gssize           res  = -1;
+
+  while (! g_cancellable_set_error_if_cancelled (cancellable, error))
+    {      
+      if (! _g_mem_pipe_check_is_open (mem_pipe, direction, error))
+	break;
+	
+      /* First check if we have to wait at all */
+      if (direction & GPD_WRITE)
+	{
+	  res = priv->buffer.size - priv->buffer.dlen;
+	  g_assert (res >= 0);
+	}
+      else /* ergo: direction & GPD_READ */
+	{
+	  res = priv->buffer.dlen;
+
+	  /* if the pipe is half open we must 
+             never wait even in the case that
+             res == 0; so we directly return */
+	  if (_g_mem_pipe_is_half_open (mem_pipe))
+	    return res;
+	}
+      /* if we got some data (or error) there is no need to wait! */
+      if (res != 0)
+	break;
+      
+      g_cond_wait (priv->cond, priv->lock);
+
+      res = -1; /* reset */
+    }
+
+  return res;
+}
+
+gssize
+g_mem_pipe_read (GMemPipe      *mem_pipe,
+                 void          *buffer,
+                 gsize          count,
+                 GCancellable  *cancellable,
+                 GError       **error)
+{
+  GMemPipePrivate *priv;
+  gssize   n;
+  gssize   nread;
+  gboolean is_buffered;
+  gulong   handler_id;
+
+  g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), -1);
+  priv = mem_pipe->priv;
+  
+  if (count == 0)
+    return 0;
+
+  handler_id = _g_mem_pipe_cancellable_connect (mem_pipe, cancellable);
+  g_mutex_lock (priv->lock);
+  
+  is_buffered = (priv->buffer.size != 0);
+  if (is_buffered == FALSE)
+    {
+      priv->buffer.data = buffer;
+      priv->buffer.size = count;
+
+      /* if the writer thread is waiting for
+         a place to write to, signal him that
+         now there is this very place */
+      g_cond_signal (priv->cond);
+    }
+
+  n = _g_mem_pipe_poll (mem_pipe, GPD_READ, cancellable, error);
+
+  if (n > 0)
+    {
+      nread = MIN (count, n);
+    
+      if (is_buffered)
+        {
+          memcpy (buffer, priv->buffer.data, nread);
+          g_memmove (priv->buffer.data, priv->buffer.data + nread, nread);
+          priv->buffer.dlen -= nread;
+        
+          /* There should be room now in the buffer,
+             maybe get more data right now */
+          g_cond_signal (priv->cond);
+        }
+      else
+        priv->buffer.dlen = priv->buffer.size = 0;
+    }
+  else
+    nread = n;
+
+  g_mutex_unlock (priv->lock);
+  g_cancellable_disconnect (cancellable, handler_id);
+  
+  return nread;
+}
+
+gssize
+g_mem_pipe_write (GMemPipe      *mem_pipe,
+                  const void    *buffer,
+                  gsize          count,
+                  GCancellable  *cancellable,
+                  GError       **error)
+{
+  GMemPipePrivate *priv;
+  gssize           n;
+  gssize           nwritten;
+  gulong           handler_id;
+
+  g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), -1);
+
+  if (count == 0)
+    return 0;
+
+  priv = mem_pipe->priv;
+  handler_id = _g_mem_pipe_cancellable_connect (mem_pipe, cancellable);
+  
+  g_mutex_lock (priv->lock);
+  
+  n = _g_mem_pipe_poll (mem_pipe, GPD_WRITE, cancellable, error);
+
+  if (n > 0)
+    {
+      nwritten = MIN (count, n);
+      memcpy (priv->buffer.data + priv->buffer.dlen, buffer, nwritten);
+      priv->buffer.dlen += nwritten;
+    
+      /* data is now available, wake up any sleeping thread */
+      g_cond_signal (priv->cond);
+    }
+  else
+    nwritten = n;
+
+  g_mutex_unlock (priv->lock);
+  g_cancellable_disconnect (cancellable, handler_id);
+  
+  return nwritten;
+}
+
+gboolean
+g_mem_pipe_write_all  (GMemPipe      *mem_pipe,
+                       const void    *buffer,
+                       gsize          count,
+                       gsize         *bytes_written,
+                       GCancellable  *cancellable,
+                       GError       **error)
+{
+  gssize    n;
+  gsize     nwritten;
+  gboolean  res;
+  
+  g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE);
+  
+  nwritten = 0;
+  res = TRUE;
+  
+  do
+    {
+      n = g_mem_pipe_write (mem_pipe,
+			    buffer + nwritten,
+			    count - nwritten,
+			    cancellable,
+			    error);
+      
+      if (n < 0)
+	break;
+      
+      nwritten += n;
+      
+    }
+  while (nwritten != count);
+  
+  if (bytes_written != NULL)
+    *bytes_written = nwritten;
+  
+  return nwritten == count;
+}
+
+gboolean
+g_mem_pipe_close_write (GMemPipe      *mem_pipe,
+			GCancellable  *cancellable,
+			GError       **error)
+{
+  GMemPipePrivate *priv;
+
+  g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE);
+
+  priv = mem_pipe->priv;
+  g_mutex_lock (priv->lock);
+  priv->closed |= GPD_WRITE;
+  g_cond_signal (priv->cond);
+  g_mutex_unlock (priv->lock);
+
+  return TRUE;
+}
+
+gboolean
+g_mem_pipe_close_read (GMemPipe      *mem_pipe,
+		       GCancellable  *cancellable,
+                       GError       **error)
+{
+  GMemPipePrivate *priv;
+
+  g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE);
+
+  priv = mem_pipe->priv;
+
+  g_mutex_lock (priv->lock);
+  priv->closed |= GPD_READ;
+  g_cond_signal (priv->cond);
+  g_mutex_unlock (priv->lock);
+
+  return TRUE;
+}
+
+GMemPipe *
+g_mem_pipe_new ()
+{
+    GMemPipe *mem_pipe;
+
+    mem_pipe = g_object_new (G_TYPE_MEM_PIPE, NULL);
+
+    return mem_pipe;
+}
+
+GMemPipe *
+g_mem_pipe_buffered_new (gsize buffer_size)
+{
+  GMemPipe *mem_pipe;
+  
+  mem_pipe = g_object_new (G_TYPE_MEM_PIPE, 
+			   "buffer-size", buffer_size,
+			   NULL);
+  return mem_pipe;
+}
+
+GInputStream *
+g_mem_pipe_get_input_stream (GMemPipe *mem_pipe)
+{
+  GMemPipePrivate *priv;
+  
+  g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE);
+  
+  priv = mem_pipe->priv;
+  return g_object_ref (priv->input_stream);
+}
+
+GOutputStream *
+g_mem_pipe_get_output_stream (GMemPipe *mem_pipe)
+{
+  GMemPipePrivate *priv;
+  
+  g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE);
+  
+  priv = mem_pipe->priv;
+  return g_object_ref (priv->output_stream);
+}
+
+
+/* *********************************************************************** */
+/* streams */
+
+/* ********************************* */
+/* input stream */
+typedef GInputStreamClass GMemPipeInputStreamClass;
+struct _GMemPipeInputStream
+{
+  GInputStream parent_instance;
+  GMemPipe *mem_pipe;
+};
+
+#define G_TYPE_MEM_PIPE_INPUT_STREAM  (g_mem_pipe_input_stream_get_type ())
+#define G_MEM_PIPE_INPUT_STREAM(o)    (G_TYPE_CHECK_INSTANCE_CAST ((o), \
+                                       G_TYPE_MEM_PIPE_INPUT_STREAM,    \
+				       GMemPipeInputStream))
+#define G_IS_MEM_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), \
+				       G_TYPE_MEM_PIPE_INPUT_STREAM))
+
+GType   g_mem_pipe_input_stream_get_type (void) G_GNUC_CONST;
+
+G_DEFINE_TYPE (GMemPipeInputStream,
+	       g_mem_pipe_input_stream,
+	       G_TYPE_INPUT_STREAM)
+
+static GInputStream *
+g_mem_pipe_input_stream_new (GMemPipe *mem_pipe)
+{
+  GMemPipeInputStream *istream;
+  istream = g_object_new (G_TYPE_MEM_PIPE_INPUT_STREAM, NULL);
+
+  istream->mem_pipe = g_object_ref (mem_pipe);
+  return G_INPUT_STREAM (istream);
+}
+
+static gssize
+g_mem_pipe_input_stream_read (GInputStream  *stream,
+			      void          *buffer,
+			      gsize          length,
+			      GCancellable  *cancellable,
+			      GError       **error)
+{
+  GMemPipeInputStream *istream = (GMemPipeInputStream *) stream;
+
+  return g_mem_pipe_read (istream->mem_pipe,
+			  buffer,
+			  length,
+			  cancellable,
+			  error);
+}
+
+static gboolean
+g_mem_pipe_input_stream_close (GInputStream  *stream,
+			       GCancellable  *cancellable,
+			       GError       **error)
+{
+  GMemPipeInputStream *istream = (GMemPipeInputStream *) stream;
+  
+  return g_mem_pipe_close_read (istream->mem_pipe,
+			       cancellable,
+			       error);
+}
+
+static void
+g_mem_pipe_input_stream_init (GMemPipeInputStream *self)
+{
+  
+}
+
+static void
+g_mem_pipe_input_stream_dispose (GObject *object)
+{
+  GMemPipeInputStream *istream;
+
+  istream = G_MEM_PIPE_INPUT_STREAM (object);
+
+  if (istream->mem_pipe != NULL)
+    {
+      g_object_unref (istream->mem_pipe);
+      istream->mem_pipe = NULL;
+    }
+
+  G_OBJECT_CLASS (g_mem_pipe_input_stream_parent_class)->dispose (object);
+}
+
+static void
+g_mem_pipe_input_stream_class_init (GMemPipeInputStreamClass *klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
+
+  gobject_class->dispose  = g_mem_pipe_input_stream_dispose;
+  stream_class->read_fn   = g_mem_pipe_input_stream_read;
+  stream_class->close_fn  = g_mem_pipe_input_stream_close;
+}
+
+
+/* ********************************* */
+/* output stream */
+typedef GOutputStreamClass GMemPipeOutputStreamClass;
+struct _GMemPipeOutputStream
+{
+  GOutputStream parent_instance;
+  GMemPipe *mem_pipe;
+};
+
+#define G_TYPE_MEM_PIPE_OUTPUT_STREAM  (g_mem_pipe_output_stream_get_type ())
+#define G_MEM_PIPE_OUTPUT_STREAM(o)    (G_TYPE_CHECK_INSTANCE_CAST ((o), \
+                                        G_TYPE_MEM_PIPE_OUTPUT_STREAM,   \
+				        GMemPipeOutputStream))
+#define G_IS_MEM_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), \
+				        G_TYPE_MEM_PIPE_OUTPUT_STREAM))
+
+GType   g_mem_pipe_output_stream_get_type (void) G_GNUC_CONST;
+
+G_DEFINE_TYPE (GMemPipeOutputStream,
+	       g_mem_pipe_output_stream,
+	       G_TYPE_OUTPUT_STREAM)
+
+static GOutputStream *
+g_mem_pipe_output_stream_new (GMemPipe *mem_pipe)
+{
+  GMemPipeOutputStream *ostream;
+  ostream = g_object_new (G_TYPE_MEM_PIPE_OUTPUT_STREAM, NULL);
+
+  ostream->mem_pipe = g_object_ref (mem_pipe);
+  return G_OUTPUT_STREAM (ostream);
+}
+
+static gssize
+g_mem_pipe_output_stream_write (GOutputStream  *stream,
+				const void     *buffer,
+				gsize           count,
+				GCancellable   *cancellable,
+				GError        **error)
+{
+  GMemPipeOutputStream *ostream = (GMemPipeOutputStream *) stream;
+
+  return g_mem_pipe_write (ostream->mem_pipe,
+			   buffer,
+			   count,
+			   cancellable,
+			   error);
+}
+
+static gboolean
+g_mem_pipe_output_stream_close (GOutputStream  *stream,
+				GCancellable   *cancellable,
+				GError        **error)
+{
+  GMemPipeOutputStream *ostream = (GMemPipeOutputStream *) stream;
+
+  g_return_val_if_fail (G_IS_MEMORY_OUTPUT_STREAM (stream), FALSE);
+
+  return g_mem_pipe_close_write (ostream->mem_pipe,
+				 cancellable,
+				 error);
+}
+
+static void
+g_mem_pipe_output_stream_init (GMemPipeOutputStream *self)
+{
+  
+}
+
+static void
+g_mem_pipe_output_stream_dispose (GObject *object)
+{
+  GMemPipeOutputStream *ostream;
+
+  ostream = G_MEM_PIPE_OUTPUT_STREAM (object);
+
+  if (ostream->mem_pipe != NULL)
+    {
+      g_object_unref (ostream->mem_pipe);
+      ostream->mem_pipe = NULL;
+    }
+
+  G_OBJECT_CLASS (g_mem_pipe_output_stream_parent_class)->dispose (object);
+}
+
+static void
+g_mem_pipe_output_stream_class_init (GMemPipeOutputStreamClass *klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
+
+  gobject_class->dispose = g_mem_pipe_output_stream_dispose;
+  stream_class->write_fn = g_mem_pipe_output_stream_write;
+  stream_class->close_fn = g_mem_pipe_output_stream_close;
+}
diff --git a/daemon/gmempipe.h b/daemon/gmempipe.h
new file mode 100644
index 0000000..c9b1cd3
--- /dev/null
+++ b/daemon/gmempipe.h
@@ -0,0 +1,86 @@
+/* GIO - GLib Input, Output and Streaming Library
+ * 
+ * Copyright (C) Christian Kellner <gicmo@gnome.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ * Author: Christian Kellner <gicmo@gnome.org>
+ */
+
+#ifndef __G_MEM_PIPE_H__
+#define __G_MEM_PIPE_H__
+
+#include <glib-object.h>
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define G_TYPE_MEM_PIPE         (g_mem_pipe_get_type ())
+#define G_MEM_PIPE(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_MEM_PIPE, GMemPipe))
+#define G_MEM_PIPE_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), G_TYPE_MEM_PIPE, GMemPipeClass))
+#define G_IS_MEM_PIPE(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_MEM_PIPE))
+#define G_IS_MEM_PIPE_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), G_TPE_MEM_PIPE))
+#define G_MEM_PIPE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), G_TYPE_MEM_PIPE, GMemPipeClass))
+
+typedef struct _GMemPipe        GMemPipe;
+typedef struct _GMemPipePrivate GMemPipePrivate;
+typedef struct _GMemPipeClass   GMemPipeClass;
+
+struct _GMemPipe
+{
+  GObject parent_instance;
+
+  GMemPipePrivate *priv;
+};
+
+struct _GMemPipeClass
+{
+  GObjectClass parent_class;
+
+};
+
+GType          g_mem_pipe_get_type          (void) G_GNUC_CONST;
+GMemPipe *     g_mem_pipe_new               (void);
+GMemPipe *     g_mem_pipe_buffered_new      (gsize          buffer_size);
+gssize         g_mem_pipe_write             (GMemPipe      *mem_pipe,
+					     const void    *buffer,
+					     gsize          count,
+					     GCancellable  *cancellable,
+					     GError       **error);
+gboolean       g_mem_pipe_write_all         (GMemPipe      *mem_pipe,
+					     const void    *buffer,
+					     gsize          count,
+					     gsize         *bytes_written,
+					     GCancellable  *cancellable,
+					     GError       **error);
+gssize         g_mem_pipe_read              (GMemPipe      *mem_pipe,
+					     void          *buffer,
+					     gsize          count,
+					     GCancellable  *cancellable,
+					     GError       **error);
+gboolean       g_mem_pipe_close_write       (GMemPipe      *mem_pipe,
+					     GCancellable  *cancellable,
+					     GError       **error);
+gboolean       g_mem_pipe_close_read        (GMemPipe      *mem_pipe,
+					     GCancellable  *cancellable,
+					     GError       **error);
+GInputStream  *g_mem_pipe_get_input_stream  (GMemPipe      *mem_pipe);
+GOutputStream *g_mem_pipe_get_output_stream (GMemPipe      *mem_pipe);
+
+
+G_END_DECLS
+
+#endif /* __G_MEM_PIPE_H__ */
diff --git a/daemon/gvfsbackenddav.c b/daemon/gvfsbackenddav.c
index 95dc428..0862249 100644
--- a/daemon/gvfsbackenddav.c
+++ b/daemon/gvfsbackenddav.c
@@ -2313,6 +2313,47 @@ try_unmount (GVfsBackend    *backend,
   _exit (0);
 }
 
+
+/* *** overwrite http backend method *** */
+static void
+do_open_for_read (GVfsBackend        *backend,
+		  GVfsJobOpenForRead *job,
+		  const char         *filename)
+{
+  SoupURI     *uri;
+  GFileType    file_type;
+  gboolean     res;
+  guint        num_children;
+  GError      *error;
+
+  error = NULL;
+
+  uri = http_backend_uri_for_filename (backend, filename, FALSE);
+  res = stat_location (backend, uri, &file_type, &num_children, &error);
+  soup_uri_free (uri);
+
+  if (res == FALSE)
+    {
+      g_vfs_job_failed_from_error (G_VFS_JOB (job), error);
+      g_error_free (error);
+      return;
+    }
+
+  if (file_type == G_FILE_TYPE_DIRECTORY)
+    {
+      g_vfs_job_failed (G_VFS_JOB (job),
+                        G_IO_ERROR, G_IO_ERROR_IS_DIRECTORY,
+                        _("Directory not empty"));
+      return;
+    }
+
+  /* the real work happens in the parent class */
+  G_VFS_BACKEND_CLASS (g_vfs_backend_dav_parent_class)->open_for_read (backend,
+								       job,
+								       filename);
+}
+
+
 /* ************************************************************************* */
 /*  */
 static void
@@ -2326,6 +2367,8 @@ g_vfs_backend_dav_class_init (GVfsBackendDavClass *klass)
 
   backend_class = G_VFS_BACKEND_CLASS (klass); 
 
+  backend_class->open_for_read     = do_open_for_read;
+
   backend_class->try_mount         = NULL;
   backend_class->mount             = do_mount;
   backend_class->try_query_info    = NULL;
diff --git a/daemon/gvfsbackendhttp.c b/daemon/gvfsbackendhttp.c
index 976e0ed..8534d83 100644
--- a/daemon/gvfsbackendhttp.c
+++ b/daemon/gvfsbackendhttp.c
@@ -239,7 +239,6 @@ http_error_code_from_status (guint status)
   return G_IO_ERROR_FAILED;
 }
 
-
 static void
 g_vfs_job_failed_from_http_status (GVfsJob *job, guint status_code, const char *message)
 {
@@ -262,6 +261,16 @@ g_vfs_job_failed_from_http_status (GVfsJob *job, guint status_code, const char *
   }
 }
 
+static void
+g_vfs_job_failed_from_soup_error (GVfsJob *job, GError *error)
+{
+  if (error->domain == SOUP_HTTP_ERROR)
+    g_vfs_job_failed_from_http_status (job, error->code, error->message);
+  else
+    g_vfs_job_failed_literal (job, error->domain, error->code, error->message);
+}
+
+
 guint
 http_backend_send_message (GVfsBackend *backend,
                            SoupMessage *msg)
@@ -335,53 +344,17 @@ try_mount (GVfsBackend  *backend,
   return TRUE;
 }
 
-/* *** open_read () *** */
-static void
-open_for_read_ready (GObject      *source_object,
-                     GAsyncResult *result,
-                     gpointer      user_data)
-{
-  GInputStream *stream;
-  GVfsJob      *job;
-  gboolean      res;
-  gboolean      can_seek;
-  GError       *error;
-
-  stream = G_INPUT_STREAM (source_object); 
-  error  = NULL;
-  job    = G_VFS_JOB (user_data);
-
-  res = soup_input_stream_send_finish (stream,
-                                       result,
-                                       &error);
-  if (res == FALSE)
-    {
-      g_vfs_job_failed_literal (G_VFS_JOB (job),
-                                error->domain,
-                                error->code,
-                                error->message);
-
-      g_error_free (error);
-      g_object_unref (stream);
-      return;
-    }
-
-  can_seek = G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream));
-
-  g_vfs_job_open_for_read_set_can_seek (G_VFS_JOB_OPEN_FOR_READ (job), can_seek);
-  g_vfs_job_open_for_read_set_handle (G_VFS_JOB_OPEN_FOR_READ (job), stream);
-  g_vfs_job_succeeded (job);
-}
-
-static gboolean 
-try_open_for_read (GVfsBackend        *backend,
-                   GVfsJobOpenForRead *job,
-                   const char         *filename)
+static void 
+do_open_for_read (GVfsBackend        *backend,
+		  GVfsJobOpenForRead *job,
+		  const char         *filename)
 {
   GVfsBackendHttp *op_backend;
   GInputStream    *stream;
   SoupMessage     *msg;
   SoupURI         *uri;
+  GError          *error;
+  gboolean         res;
 
   op_backend = G_VFS_BACKEND_HTTP (backend);
   uri = http_backend_uri_for_filename (backend, filename, FALSE);
@@ -390,125 +363,119 @@ try_open_for_read (GVfsBackend        *backend,
 
   soup_message_body_set_accumulate (msg->response_body, FALSE);
 
-  stream = soup_input_stream_new (op_backend->session_async, msg);
+  stream = soup_input_stream_new (op_backend->session, msg);
   g_object_unref (msg);
 
-  soup_input_stream_send_async (stream,
-                                G_PRIORITY_DEFAULT,
-                                G_VFS_JOB (job)->cancellable,
-                                open_for_read_ready,
-                                job);
-  return TRUE;
+  error = NULL;
+  res = soup_input_stream_send (stream,
+				G_VFS_JOB (job)->cancellable,
+				&error);
+
+  if (res == FALSE)
+    {
+      g_vfs_job_failed_from_soup_error (G_VFS_JOB (job), error);
+      g_error_free (error);
+      g_object_unref (stream);
+    }
+  else
+    {
+      gboolean can_seek;
+      can_seek = G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream));
+      
+      g_vfs_job_open_for_read_set_can_seek (G_VFS_JOB_OPEN_FOR_READ (job), can_seek);
+      g_vfs_job_open_for_read_set_handle (G_VFS_JOB_OPEN_FOR_READ (job), stream);
+
+      g_vfs_job_succeeded (G_VFS_JOB (job));
+    }
 }
 
-/* *** read () *** */
 static void
-read_ready (GObject      *source_object,
-            GAsyncResult *result,
-            gpointer      user_data)
+do_read (GVfsBackend *     backend,
+         GVfsJobRead *     job,
+         GVfsBackendHandle handle,
+         char *            buffer,
+         gsize             bytes_requested)
 {
   GInputStream *stream;
-  GVfsJob      *job;
   GError       *error;
-  gssize        nread;
-
-  stream = G_INPUT_STREAM (source_object); 
-  error  = NULL;
-  job    = G_VFS_JOB (user_data);
-
-  nread = g_input_stream_read_finish (stream, result, &error);
-
-  if (nread < 0)
-   {
-     g_vfs_job_failed_literal (G_VFS_JOB (job),
-                               error->domain,
-                               error->code,
-                               error->message);
+  gssize        n_bytes;
 
-     g_error_free (error);
-     return;
-   }
+  stream = G_INPUT_STREAM (handle);
 
-  g_vfs_job_read_set_size (G_VFS_JOB_READ (job), nread);
-  g_vfs_job_succeeded (job);
+  error = NULL;
+  n_bytes = g_input_stream_read (stream,
+				 buffer,
+				 bytes_requested,
+				 G_VFS_JOB (job)->cancellable,
+				 &error);
 
+  if (n_bytes >= 0)
+    {
+      g_vfs_job_read_set_size (job, n_bytes);
+      g_vfs_job_succeeded (G_VFS_JOB (job));
+    }
+  else
+    {
+      g_vfs_job_failed_from_soup_error (G_VFS_JOB (job), error);
+      g_error_free (error);
+    }
 }
 
-static gboolean
-try_read (GVfsBackend        *backend,
-          GVfsJobRead        *job,
-          GVfsBackendHandle   handle,
-          char               *buffer,
-          gsize               bytes_requested)
-{
-  GInputStream    *stream;
 
-  stream = G_INPUT_STREAM (handle);
-
-  g_input_stream_read_async (stream,
-                             buffer,
-                             bytes_requested,
-                             G_PRIORITY_DEFAULT,
-                             G_VFS_JOB (job)->cancellable,
-                             read_ready,
-                             job);
-  return TRUE;
-}
-
-static gboolean
-try_seek_on_read (GVfsBackend *backend,
-                  GVfsJobSeekRead *job,
-                  GVfsBackendHandle handle,
-                  goffset    offset,
-                  GSeekType  type)
+static void
+do_seek_on_read (GVfsBackend *backend,
+		 GVfsJobSeekRead *job,
+		 GVfsBackendHandle handle,
+		 goffset    offset,
+		 GSeekType  type)
 {
-  GInputStream    *stream;
-  GError          *error = NULL;
+  GInputStream *stream;
+  GError       *error = NULL;
+  gboolean      res;
 
   stream = G_INPUT_STREAM (handle);
 
-  if (!g_seekable_seek (G_SEEKABLE (stream), offset, type,
-                        G_VFS_JOB (job)->cancellable, &error))
+
+  res = g_seekable_seek (G_SEEKABLE (stream),
+			 offset,
+			 type,
+			 G_VFS_JOB (job)->cancellable,
+			 &error);
+  
+  if (res == FALSE)
     {
-      g_vfs_job_failed_literal (G_VFS_JOB (job),
-                                error->domain,
-                                error->code,
-                                error->message);
+      g_vfs_job_failed_from_soup_error (G_VFS_JOB (job), error);
       g_error_free (error);
-      return FALSE;
     }
   else
     {
       g_vfs_job_seek_read_set_offset (job, g_seekable_tell (G_SEEKABLE (stream)));
       g_vfs_job_succeeded (G_VFS_JOB (job));
     }
-
-  return TRUE;
 }
 
 /* *** read_close () *** */
 static void
-close_read_ready (GObject      *source_object,
-                  GAsyncResult *result,
-                  gpointer      user_data)
+do_close_read (GVfsBackend       *backend,
+	       GVfsJobCloseRead  *read_job,
+               GVfsBackendHandle  handle)
 {
   GInputStream *stream;
   GVfsJob      *job;
   GError       *error;
   gboolean      res;
 
-  job = G_VFS_JOB (user_data);
-  stream = G_INPUT_STREAM (source_object);
-  res = g_input_stream_close_finish (stream,
-                                     result,
-                                     &error);
+  error = NULL;
+  job = G_VFS_JOB (read_job);
+  stream = G_INPUT_STREAM (handle);
+
+
+  res = g_input_stream_close (stream,
+			      job->cancellable,
+			      &error);
   if (res == FALSE)
     {
-      g_vfs_job_failed_literal (G_VFS_JOB (job),
-                                error->domain,
-                                error->code,
-                                error->message);
-
+      g_vfs_job_failed_from_soup_error (G_VFS_JOB (job), error);
       g_error_free (error);
     }
   else
@@ -517,24 +484,6 @@ close_read_ready (GObject      *source_object,
   g_object_unref (stream);
 }
 
-static gboolean 
-try_close_read (GVfsBackend       *backend,
-                GVfsJobCloseRead  *job,
-                GVfsBackendHandle  handle)
-{
-  GInputStream    *stream;
-
-  stream = G_INPUT_STREAM (handle);
-
-  g_input_stream_close_async (stream,
-                              G_PRIORITY_DEFAULT,
-                              G_VFS_JOB (job)->cancellable,
-                              close_read_ready,
-                              job);
-  return TRUE;
-}
-
-
 /* *** query_info () *** */
 
 static void
@@ -688,10 +637,12 @@ g_vfs_backend_http_class_init (GVfsBackendHttpClass *klass)
   backend_class = G_VFS_BACKEND_CLASS (klass); 
 
   backend_class->try_mount              = try_mount;
-  backend_class->try_open_for_read      = try_open_for_read;
-  backend_class->try_read               = try_read;
-  backend_class->try_seek_on_read       = try_seek_on_read;
-  backend_class->try_close_read         = try_close_read;
+
+  backend_class->open_for_read          = do_open_for_read;
+  backend_class->close_read             = do_close_read;
+  backend_class->read                   = do_read;
+  backend_class->seek_on_read        = do_seek_on_read;
+
   backend_class->try_query_info         = try_query_info;
   backend_class->try_query_info_on_read = try_query_info_on_read;
 }
diff --git a/daemon/soup-input-stream.c b/daemon/soup-input-stream.c
index e1928af..10af4dd 100644
--- a/daemon/soup-input-stream.c
+++ b/daemon/soup-input-stream.c
@@ -18,7 +18,7 @@
  * Boston, MA 02111-1307, USA.
  */
 
-#include <config.h>
+//#include <config.h>
 
 #include <string.h>
 
@@ -28,6 +28,7 @@
 #include <libsoup/soup.h>
 
 #include "soup-input-stream.h"
+#include "gmempipe.h"
 
 static void soup_input_stream_seekable_iface_init (GSeekableIface *seekable_iface);
 
@@ -37,29 +38,23 @@ G_DEFINE_TYPE_WITH_CODE (SoupInputStream, soup_input_stream, G_TYPE_INPUT_STREAM
 
 typedef void (*SoupInputStreamCallback) (GInputStream *);
 
-typedef struct {
+struct SoupInputStreamPrivate {
   SoupSession *session;
-  GMainContext *async_context;
   SoupMessage *msg;
+  
   gboolean got_headers, finished;
   goffset offset;
 
-  GCancellable *cancellable;
-  GSource *cancel_watch;
-  SoupInputStreamCallback got_headers_cb;
-  SoupInputStreamCallback got_chunk_cb;
-  SoupInputStreamCallback finished_cb;
-  SoupInputStreamCallback cancelled_cb;
+  GMutex *lock;
+  GCond  *cond;
+  
+  GMemPipe *mem_pipe;
 
-  guchar *leftover_buffer;
-  gsize leftover_bufsize, leftover_offset;
+  GInputStream  *in;
+  GOutputStream *out;
 
-  guchar *caller_buffer;
-  gsize caller_bufsize, caller_nread;
-  GAsyncReadyCallback outstanding_callback;
-  GSimpleAsyncResult *result;
+};
 
-} SoupInputStreamPrivate;
 #define SOUP_INPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_INPUT_STREAM, SoupInputStreamPrivate))
 
 
@@ -71,24 +66,6 @@ static gssize   soup_input_stream_read         (GInputStream         *stream,
 static gboolean soup_input_stream_close        (GInputStream         *stream,
 						GCancellable         *cancellable,
 						GError              **error);
-static void     soup_input_stream_read_async   (GInputStream         *stream,
-						void                 *buffer,
-						gsize                 count,
-						int                   io_priority,
-						GCancellable         *cancellable,
-						GAsyncReadyCallback   callback,
-						gpointer              data);
-static gssize   soup_input_stream_read_finish  (GInputStream         *stream,
-						GAsyncResult         *result,
-						GError              **error);
-static void     soup_input_stream_close_async  (GInputStream         *stream,
-						int                   io_priority,
-						GCancellable         *cancellable,
-						GAsyncReadyCallback   callback,
-						gpointer              data);
-static gboolean soup_input_stream_close_finish (GInputStream         *stream,
-						GAsyncResult         *result,
-						GError              **error);
 
 static goffset  soup_input_stream_tell         (GSeekable            *seekable);
   
@@ -115,13 +92,25 @@ soup_input_stream_finalize (GObject *object)
   SoupInputStream *stream = SOUP_INPUT_STREAM (object);
   SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
 
-  g_object_unref (priv->session);
+  g_print ("Finalize\n");
 
-  g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_input_stream_got_headers), stream);
-  g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_input_stream_got_chunk), stream);
-  g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_input_stream_finished), stream);
-  g_object_unref (priv->msg);
-  g_free (priv->leftover_buffer);
+  if (priv->session)
+    g_object_unref (priv->session);
+
+  if (priv->msg)
+    g_object_unref (priv->msg);
+
+  if (priv->mem_pipe)
+    {
+      g_object_unref (priv->mem_pipe);
+      g_object_unref (priv->in);
+      g_object_unref (priv->out);
+    }
+
+  g_mutex_free (priv->lock);
+  g_cond_free (priv->cond);
+
+  g_print ("Done!\n");
 
   if (G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize)
     (*G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize) (object);
@@ -136,41 +125,77 @@ soup_input_stream_class_init (SoupInputStreamClass *klass)
   g_type_class_add_private (klass, sizeof (SoupInputStreamPrivate));
   
   gobject_class->finalize = soup_input_stream_finalize;
-
-  stream_class->read_fn = soup_input_stream_read;
+  stream_class->read_fn  = soup_input_stream_read;
   stream_class->close_fn = soup_input_stream_close;
-  stream_class->read_async = soup_input_stream_read_async;
-  stream_class->read_finish = soup_input_stream_read_finish;
-  stream_class->close_async = soup_input_stream_close_async;
-  stream_class->close_finish = soup_input_stream_close_finish;
 }
 
 static void
 soup_input_stream_seekable_iface_init (GSeekableIface *seekable_iface)
 {
-  seekable_iface->tell = soup_input_stream_tell;
-  seekable_iface->can_seek = soup_input_stream_can_seek;
-  seekable_iface->seek = soup_input_stream_seek;
+  seekable_iface->tell         = soup_input_stream_tell;
+  seekable_iface->can_seek     = soup_input_stream_can_seek;
+  seekable_iface->seek         = soup_input_stream_seek;
   seekable_iface->can_truncate = soup_input_stream_can_truncate;
-  seekable_iface->truncate_fn = soup_input_stream_truncate;
+  seekable_iface->truncate_fn  = soup_input_stream_truncate;
 }
 
 static void
 soup_input_stream_init (SoupInputStream *stream)
 {
-  ;
+  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
+  stream->priv = priv;
+
+  priv->lock = g_mutex_new ();
+  priv->cond = g_cond_new ();
 }
 
 static void
-soup_input_stream_queue_message (SoupInputStream *stream)
+_soup_input_stream_queue_msg_and_wait (SoupInputStream *stream)
 {
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
+  SoupInputStreamPrivate *priv = stream->priv;
+
+  g_mutex_lock (priv->lock);
+
+  if (priv->mem_pipe)
+    {
+      g_object_unref (priv->mem_pipe);
+      g_object_unref (priv->in);
+      g_object_unref (priv->out);
+    }
+
+  priv->mem_pipe = g_mem_pipe_new ();
+  priv->in       = g_mem_pipe_get_input_stream (priv->mem_pipe);
+  priv->out      = g_mem_pipe_get_output_stream (priv->mem_pipe);
 
   priv->got_headers = priv->finished = FALSE;
 
   /* Add an extra ref since soup_session_queue_message steals one */
   g_object_ref (priv->msg);
   soup_session_queue_message (priv->session, priv->msg, NULL, NULL);
+
+  /* wait until we got the headers (or fail?) */
+  while (!priv->got_headers && !priv->finished)
+    g_cond_wait (priv->cond, priv->lock);
+
+  g_mutex_unlock (priv->lock);
+}
+
+extern void soup_message_io_cleanup (SoupMessage *msg);
+
+static void
+_soup_input_stream_cancel_msg_and_wait (SoupInputStream *stream)
+{
+  SoupInputStreamPrivate *priv = stream->priv;
+
+  g_mutex_lock (priv->lock);
+  soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED);
+  soup_message_io_cleanup (priv->msg);
+
+  /* FIXME: is this at all necessary? */
+  while (!priv->got_headers && !priv->finished)
+     g_cond_wait (priv->cond, priv->lock);
+  
+  g_mutex_unlock (priv->lock);
 }
 
 /**
@@ -181,21 +206,8 @@ soup_input_stream_queue_message (SoupInputStream *stream)
  * Prepares to send @msg over @session, and returns a #GInputStream
  * that can be used to read the response.
  *
- * @msg may not be sent until the first read call; if you need to look
- * at the status code or response headers before reading the body, you
- * can use soup_input_stream_send() or soup_input_stream_send_async()
- * to force the message to be sent and the response headers read.
+ * FIXME
  *
- * If @msg gets a non-2xx result, the first read (or send) will return
- * an error with type %SOUP_INPUT_STREAM_HTTP_ERROR.
- *
- * Internally, #SoupInputStream is implemented using asynchronous I/O,
- * so if you are using the synchronous API (eg,
- * g_input_stream_read()), you should create a new #GMainContext and
- * set it as the %SOUP_SESSION_ASYNC_CONTEXT property on @session. (If
- * you don't, then synchronous #GInputStream calls will cause the main
- * loop to be run recursively.) The async #GInputStream API works fine
- * with %SOUP_SESSION_ASYNC_CONTEXT either set or unset.
  *
  * Returns: a new #GInputStream.
  **/
@@ -211,17 +223,15 @@ soup_input_stream_new (SoupSession *session, SoupMessage *msg)
   priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
 
   priv->session = g_object_ref (session);
-  priv->async_context = soup_session_get_async_context (session);
   priv->msg = g_object_ref (msg);
-
+  
   g_signal_connect (msg, "got_headers",
-		    G_CALLBACK (soup_input_stream_got_headers), stream);
+                    G_CALLBACK (soup_input_stream_got_headers), stream);
   g_signal_connect (msg, "got_chunk",
-		    G_CALLBACK (soup_input_stream_got_chunk), stream);
+                    G_CALLBACK (soup_input_stream_got_chunk), stream);
   g_signal_connect (msg, "finished",
-		    G_CALLBACK (soup_input_stream_finished), stream);
+                    G_CALLBACK (soup_input_stream_finished), stream);
 
-  soup_input_stream_queue_message (stream);
   return G_INPUT_STREAM (stream);
 }
 
@@ -230,6 +240,8 @@ soup_input_stream_got_headers (SoupMessage *msg, gpointer stream)
 {
   SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
 
+  g_mutex_lock (priv->lock);
+
   /* If the status is unsuccessful, we just ignore the signal and let
    * libsoup keep going (eventually either it will requeue the request
    * (after handling authentication/redirection), or else the
@@ -239,19 +251,14 @@ soup_input_stream_got_headers (SoupMessage *msg, gpointer stream)
     return;
 
   priv->got_headers = TRUE;
-  if (!priv->caller_buffer)
-    {
-      /* Not ready to read the body yet */
-      soup_session_pause_message (priv->session, msg);
-    }
-
-  if (priv->got_headers_cb)
-    priv->got_headers_cb (stream);
+  g_cond_signal (priv->cond);
+  g_mutex_unlock (priv->lock);
 }
 
 static void
-soup_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk_buffer,
-			     gpointer stream)
+soup_input_stream_got_chunk (SoupMessage *msg,
+                             SoupBuffer  *chunk_buffer,
+                             gpointer     stream)
 {
   SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
   const gchar *chunk = chunk_buffer->data;
@@ -263,46 +270,14 @@ soup_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk_buffer,
   if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
     return;
 
-  /* Sanity check */
-  if (priv->caller_bufsize == 0 || priv->leftover_bufsize != 0)
-    g_warning ("soup_input_stream_got_chunk called again before previous chunk was processed");
-
-  /* Copy what we can into priv->caller_buffer */
-  if (priv->caller_bufsize - priv->caller_nread > 0)
-    {
-      gsize nread = MIN (chunk_size, priv->caller_bufsize - priv->caller_nread);
-
-      memcpy (priv->caller_buffer + priv->caller_nread, chunk, nread);
-      priv->caller_nread += nread;
-      priv->offset += nread;
-      chunk += nread;
-      chunk_size -= nread;
-    }
-
-  if (chunk_size > 0)
-    {
-      /* Copy the rest into priv->leftover_buffer. If there's already
-       * some data there, realloc and append. Otherwise just copy.
-       */
-      if (priv->leftover_bufsize)
-	{
-	  priv->leftover_buffer = g_realloc (priv->leftover_buffer,
-					     priv->leftover_bufsize + chunk_size);
-	  memcpy (priv->leftover_buffer + priv->leftover_bufsize,
-		  chunk, chunk_size);
-	  priv->leftover_bufsize += chunk_size;
-	}
-      else
-	{
-	  priv->leftover_bufsize = chunk_size;
-	  priv->leftover_buffer = g_memdup (chunk, chunk_size);
-	  priv->leftover_offset = 0;
-	}
-    }
+  g_output_stream_write_all (priv->out,
+			     chunk,
+			     chunk_size,
+			     NULL,
+			     NULL,
+			     NULL);
 
-  soup_session_pause_message (priv->session, msg);
-  if (priv->got_chunk_cb)
-    priv->got_chunk_cb (stream);
+  /* FIXME: handle errors */
 }
 
 static void
@@ -310,71 +285,12 @@ soup_input_stream_finished (SoupMessage *msg, gpointer stream)
 {
   SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
 
+  g_mutex_lock (priv->lock);
   priv->finished = TRUE;
-
-  if (priv->finished_cb)
-    priv->finished_cb (stream);
-}
-
-static gboolean
-soup_input_stream_cancelled (GIOChannel *chan, GIOCondition condition,
-			     gpointer stream)
-{
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-
-  priv->cancel_watch = NULL;
-
-  soup_session_pause_message (priv->session, priv->msg);
-  if (priv->cancelled_cb)
-    priv->cancelled_cb (stream);
-
-  return FALSE;
-}  
-
-static void
-soup_input_stream_prepare_for_io (GInputStream *stream,
-				  GCancellable *cancellable,
-				  guchar       *buffer,
-				  gsize         count)
-{
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-  int cancel_fd;
-
-  priv->cancellable = cancellable;
-  cancel_fd = g_cancellable_get_fd (cancellable);
-  if (cancel_fd != -1)
-    {
-      GIOChannel *chan = g_io_channel_unix_new (cancel_fd);
-      priv->cancel_watch = soup_add_io_watch (priv->async_context, chan,
-					      G_IO_IN | G_IO_ERR | G_IO_HUP,
-					      soup_input_stream_cancelled,
-					      stream);
-      g_io_channel_unref (chan);
-    }
-
-  priv->caller_buffer = buffer;
-  priv->caller_bufsize = count;
-  priv->caller_nread = 0;
-
-  if (priv->got_headers)
-    soup_session_unpause_message (priv->session, priv->msg);
-}
-
-static void
-soup_input_stream_done_io (GInputStream *stream)
-{
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-
-  if (priv->cancel_watch)
-    {
-      g_source_destroy (priv->cancel_watch);
-      priv->cancel_watch = NULL;
-      g_cancellable_release_fd (priv->cancellable);
-    }
-  priv->cancellable = NULL;
-
-  priv->caller_buffer = NULL;
-  priv->caller_bufsize = 0;
+  //g_print ("Finished!\n");
+  g_mem_pipe_close_write (priv->mem_pipe, NULL, NULL);
+  g_cond_signal (priv->cond);
+  g_mutex_unlock (priv->lock);
 }
 
 static gboolean
@@ -386,58 +302,10 @@ set_error_if_http_failed (SoupMessage *msg, GError **error)
 			   msg->status_code, msg->reason_phrase);
       return TRUE;
     }
+  
   return FALSE;
 }
 
-static gsize
-read_from_leftover (SoupInputStreamPrivate *priv,
-		    gpointer buffer, gsize bufsize)
-{
-  gsize nread;
-
-  if (priv->leftover_bufsize - priv->leftover_offset <= bufsize)
-    {
-      nread = priv->leftover_bufsize - priv->leftover_offset;
-      memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread);
-
-      g_free (priv->leftover_buffer);
-      priv->leftover_buffer = NULL;
-      priv->leftover_bufsize = priv->leftover_offset = 0;
-    }
-  else
-    {
-      nread = bufsize;
-      memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread);
-      priv->leftover_offset += nread;
-    }
-
-  priv->offset += nread;
-  return nread;
-}
-
-/* This does the work of soup_input_stream_send(), assuming that the
- * GInputStream pending flag has already been set. It is also used by
- * soup_input_stream_send_async() in some circumstances.
- */
-static gboolean
-soup_input_stream_send_internal (GInputStream  *stream,
-				 GCancellable  *cancellable,
-				 GError       **error)
-{
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-
-  soup_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
-  while (!priv->finished && !priv->got_headers &&
-	 !g_cancellable_is_cancelled (cancellable))
-    g_main_context_iteration (priv->async_context, TRUE);
-  soup_input_stream_done_io (stream);
-
-  if (g_cancellable_set_error_if_cancelled (cancellable, error))
-    return FALSE;
-  else if (set_error_if_http_failed (priv->msg, error))
-    return FALSE;
-  return TRUE;
-}
 
 /**
  * soup_input_stream_send:
@@ -454,54 +322,59 @@ soup_input_stream_send_internal (GInputStream  *stream,
  * not.
  **/
 gboolean
-soup_input_stream_send (GInputStream *stream,
-			GCancellable *cancellable,
-			GError      **error)
+soup_input_stream_send (GInputStream *input_stream,
+                        GCancellable *cancellable,
+                        GError      **error)
 {
+  SoupInputStream        *stream;
+  SoupInputStreamPrivate *priv;
   gboolean result;
 
-  g_return_val_if_fail (SOUP_IS_INPUT_STREAM (stream), FALSE);
+  g_return_val_if_fail (SOUP_IS_INPUT_STREAM (input_stream), FALSE);
+  
+  stream = SOUP_INPUT_STREAM (input_stream);
+  priv = stream->priv;
 
-  if (!g_input_stream_set_pending (stream, error))
+  if (!g_input_stream_set_pending (input_stream, error))
       return FALSE;
-  result = soup_input_stream_send_internal (stream, cancellable, error);
-  g_input_stream_clear_pending (stream);
+  
+  _soup_input_stream_queue_msg_and_wait (stream);
+
+  g_input_stream_clear_pending (input_stream);
+  
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return FALSE;
+
+  result = ! set_error_if_http_failed (priv->msg, error);
 
   return result;
 }
 
 static gssize
-soup_input_stream_read (GInputStream *stream,
-			void         *buffer,
-			gsize         count,
-			GCancellable *cancellable,
-			GError      **error)
+soup_input_stream_read (GInputStream  *stream,
+                        void          *buffer,
+                        gsize          count,
+                        GCancellable  *cancellable,
+                        GError       **error)
 {
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-
-  if (priv->finished)
-    return 0;
+  SoupInputStream        *istream;
+  SoupInputStreamPrivate *priv;
+  gssize nread;
 
-  /* If there is data leftover from a previous read, return it. */
-  if (priv->leftover_bufsize)
-    return read_from_leftover (priv, buffer, count);
+  g_return_val_if_fail (SOUP_IS_INPUT_STREAM (stream), -1);
+  
+  istream = SOUP_INPUT_STREAM (stream);
+  priv = istream->priv;
 
-  /* No leftover data, accept one chunk from the network */
-  soup_input_stream_prepare_for_io (stream, cancellable, buffer, count);
-  while (!priv->finished && priv->caller_nread == 0 &&
-	 !g_cancellable_is_cancelled (cancellable))
-    g_main_context_iteration (priv->async_context, TRUE);
-  soup_input_stream_done_io (stream);
+  nread = g_input_stream_read (priv->in,
+			       buffer,
+			       count,
+			       cancellable,
+			       error);
 
-  if (priv->caller_nread > 0)
-    return priv->caller_nread;
+  priv->offset += nread;
 
-  if (g_cancellable_set_error_if_cancelled (cancellable, error))
-    return -1;
-  else if (set_error_if_http_failed (priv->msg, error))
-    return -1;
-  else
-    return 0;
+  return nread;
 }
 
 static gboolean
@@ -509,314 +382,25 @@ soup_input_stream_close (GInputStream *stream,
 			 GCancellable *cancellable,
 			 GError      **error)
 {
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-
-  if (!priv->finished)
-    soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED);
-
-  return TRUE;
-}
-
-static void
-wrapper_callback (GObject *source_object, GAsyncResult *res,
-		  gpointer user_data)
-{
-  GInputStream *stream = G_INPUT_STREAM (source_object);
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-
-  g_input_stream_clear_pending (stream);
-  if (priv->outstanding_callback)
-    (*priv->outstanding_callback) (source_object, res, user_data);
-  priv->outstanding_callback = NULL;
-  g_object_unref (stream);
-}
-
-static void
-send_async_thread (GSimpleAsyncResult *res,
-		   GObject *object,
-		   GCancellable *cancellable)
-{
-  GError *error = NULL;
-  gboolean success;
-
-  success = soup_input_stream_send_internal (G_INPUT_STREAM (object),
-					     cancellable, &error);
-  g_simple_async_result_set_op_res_gboolean (res, success);
-  if (error)
-    {
-      g_simple_async_result_set_from_error (res, error);
-      g_error_free (error);
-    }
-}
-
-static void
-soup_input_stream_send_async_in_thread (GInputStream        *stream,
-					int                  io_priority,
-					GCancellable        *cancellable,
-					GAsyncReadyCallback  callback,
-					gpointer             user_data)
-{
-  GSimpleAsyncResult *res;
-
-  res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
-				   soup_input_stream_send_async_in_thread);
-  g_simple_async_result_run_in_thread (res, send_async_thread,
-				       io_priority, cancellable);
-  g_object_unref (res);
-}
-
-static void
-send_async_finished (GInputStream *stream)
-{
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-  GSimpleAsyncResult *result;
-  GError *error = NULL;
-
-  if (!g_cancellable_set_error_if_cancelled (priv->cancellable, &error))
-    set_error_if_http_failed (priv->msg, &error);
-
-  priv->got_headers_cb = NULL;
-  priv->finished_cb = NULL;
-  soup_input_stream_done_io (stream);
-
-  result = priv->result;
-  priv->result = NULL;
-
-  g_simple_async_result_set_op_res_gboolean (result, error == NULL);
-  if (error)
-    {
-      g_simple_async_result_set_from_error (result, error);
-      g_error_free (error);
-    }
-  g_simple_async_result_complete (result);
-}
-
-static void
-soup_input_stream_send_async_internal (GInputStream        *stream,
-				       int                  io_priority,
-				       GCancellable        *cancellable,
-				       GAsyncReadyCallback  callback,
-				       gpointer             user_data)
-{
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-
-  g_object_ref (stream);
-  priv->outstanding_callback = callback;
-
-  /* If the session uses the default GMainContext, then we can do
-   * async I/O directly. But if it has its own main context, it's
-   * easier to just run it in another thread.
-   */
-  if (soup_session_get_async_context (priv->session))
-    {
-      soup_input_stream_send_async_in_thread (stream, io_priority, cancellable,
-					      wrapper_callback, user_data);
-      return;
-    }
-
-  priv->got_headers_cb = send_async_finished;
-  priv->finished_cb = send_async_finished;
-
-  soup_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
-  priv->result = g_simple_async_result_new (G_OBJECT (stream),
-					    wrapper_callback, user_data,
-					    soup_input_stream_send_async);
-}
-
-/**
- * soup_input_stream_send_async:
- * @stream: a #SoupInputStream
- * @io_priority: the io priority of the request.
- * @cancellable: optional #GCancellable object, %NULL to ignore.
- * @callback: callback to call when the request is satisfied
- * @user_data: the data to pass to callback function
- *
- * Asynchronously sends the HTTP request associated with @stream, and
- * reads the response headers. Call this after soup_input_stream_new()
- * and before the first g_input_stream_read_async() if you want to
- * check the HTTP status code before you start reading.
- **/
-void
-soup_input_stream_send_async (GInputStream        *stream,
-			      int                  io_priority,
-			      GCancellable        *cancellable,
-			      GAsyncReadyCallback  callback,
-			      gpointer             user_data)
-{
-  GError *error = NULL;
-
-  g_return_if_fail (SOUP_IS_INPUT_STREAM (stream));
-
-  if (!g_input_stream_set_pending (stream, &error))
-    {
-      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
-					    callback,
-					    user_data,
-					    error);
-      g_error_free (error);
-      return;
-    }
-  soup_input_stream_send_async_internal (stream, io_priority, cancellable,
-					 callback, user_data);
-}
-
-/**
- * soup_input_stream_send_finish:
- * @stream: a #SoupInputStream
- * @result: a #GAsyncResult.
- * @error: a #GError location to store the error occuring, or %NULL to 
- * ignore.
- *
- * Finishes a soup_input_stream_send_async() operation.
- *
- * Return value: %TRUE if the message was sent successfully and
- * received a successful status code, %FALSE if not.
- **/
-gboolean
-soup_input_stream_send_finish (GInputStream  *stream,
-			       GAsyncResult  *result,
-			       GError       **error)
-{
-  GSimpleAsyncResult *simple;
-
-  g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE);
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-
-  g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_input_stream_send_async, FALSE);
-
-  if (g_simple_async_result_propagate_error (simple, error))
-    return FALSE;
-
-  return g_simple_async_result_get_op_res_gboolean (simple);
-}
-
-static void
-read_async_done (GInputStream *stream)
-{
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-  GSimpleAsyncResult *result;
-  GError *error = NULL;
-
-  result = priv->result;
-  priv->result = NULL;
-
-  if (g_cancellable_set_error_if_cancelled (priv->cancellable, &error) ||
-      set_error_if_http_failed (priv->msg, &error))
-    {
-      g_simple_async_result_set_from_error (result, error);
-      g_error_free (error);
-    }
-  else
-    g_simple_async_result_set_op_res_gssize (result, priv->caller_nread);
-
-  priv->got_chunk_cb = NULL;
-  priv->finished_cb = NULL;
-  priv->cancelled_cb = NULL;
-  soup_input_stream_done_io (stream);
-
-  g_simple_async_result_complete (result);
-  g_object_unref (result);
-}
-
-static void
-soup_input_stream_read_async (GInputStream        *stream,
-			      void                *buffer,
-			      gsize                count,
-			      int                  io_priority,
-			      GCancellable        *cancellable,
-			      GAsyncReadyCallback  callback,
-			      gpointer             user_data)
-{
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
-  GSimpleAsyncResult *result;
-
-  /* If the session uses the default GMainContext, then we can do
-   * async I/O directly. But if it has its own main context, we fall
-   * back to the async-via-sync-in-another-thread implementation.
-   */
-  if (soup_session_get_async_context (priv->session))
-    {
-      G_INPUT_STREAM_CLASS (soup_input_stream_parent_class)->
-	read_async (stream, buffer, count, io_priority,
-		    cancellable, callback, user_data);
-      return;
-    }
-
-  result = g_simple_async_result_new (G_OBJECT (stream),
-				      callback, user_data,
-				      soup_input_stream_read_async);
-
-  if (priv->finished)
-    {
-      g_simple_async_result_set_op_res_gssize (result, 0);
-      g_simple_async_result_complete_in_idle (result);
-      g_object_unref (result);
-      return;
-    }
-
-  if (priv->leftover_bufsize)
-    {
-      gsize nread = read_from_leftover (priv, buffer, count);
-      g_simple_async_result_set_op_res_gssize (result, nread);
-      g_simple_async_result_complete_in_idle (result);
-      g_object_unref (result);
-      return;
-    }
-
-  priv->result = result;
-
-  priv->got_chunk_cb = read_async_done;
-  priv->finished_cb = read_async_done;
-  priv->cancelled_cb = read_async_done;
-  soup_input_stream_prepare_for_io (stream, cancellable, buffer, count);
-}
-
-static gssize
-soup_input_stream_read_finish (GInputStream  *stream,
-			       GAsyncResult  *result,
-			       GError       **error)
-{
-  GSimpleAsyncResult *simple;
+  SoupInputStream        *istream;
+  SoupInputStreamPrivate *priv;
 
-  g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), -1);
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-  g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_input_stream_read_async, -1);
+  g_return_val_if_fail (SOUP_IS_INPUT_STREAM (stream), -1);
 
-  return g_simple_async_result_get_op_res_gssize (simple);
-}
+  istream = SOUP_INPUT_STREAM (stream);
+  priv = istream->priv;
 
-static void
-soup_input_stream_close_async (GInputStream       *stream,
-			       int                 io_priority,
-			       GCancellable       *cancellable,
-			       GAsyncReadyCallback callback,
-			       gpointer            user_data)
-{
-  GSimpleAsyncResult *result;
-  gboolean success;
-  GError *error = NULL;
-
-  result = g_simple_async_result_new (G_OBJECT (stream),
-				      callback, user_data,
-				      soup_input_stream_close_async);
-  success = soup_input_stream_close (stream, cancellable, &error);
-  g_simple_async_result_set_op_res_gboolean (result, success);
-  if (error)
+  g_mutex_lock (priv->lock);
+  if (!priv->finished)
     {
-      g_simple_async_result_set_from_error (result, error);
-      g_error_free (error);
+      soup_session_cancel_message (priv->session,
+				   priv->msg,
+				   SOUP_STATUS_CANCELLED);
+      g_print ("Cancelling message\n");
     }
 
-  g_simple_async_result_complete_in_idle (result);
-  g_object_unref (result);
-}
-
-static gboolean
-soup_input_stream_close_finish (GInputStream  *stream,
-				GAsyncResult  *result,
-				GError       **error)
-{
-  /* Failures handled in generic close_finish code */
+  g_mem_pipe_close_read (priv->mem_pipe, cancellable, error);
+  g_mutex_unlock (priv->lock);
   return TRUE;
 }
 
@@ -834,17 +418,17 @@ soup_input_stream_can_seek (GSeekable *seekable)
   return TRUE;
 }
 
-extern void soup_message_io_cleanup (SoupMessage *msg);
 
 static gboolean
 soup_input_stream_seek (GSeekable     *seekable,
-			goffset        offset,
-			GSeekType      type,
-			GCancellable  *cancellable,
-			GError       **error)
+                        goffset        offset,
+                        GSeekType      type,
+                        GCancellable  *cancellable,
+                        GError       **error)
 {
   GInputStream *stream = G_INPUT_STREAM (seekable);
-  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (seekable);
+  SoupInputStream *istream = SOUP_INPUT_STREAM (stream);
+  SoupInputStreamPrivate *priv = istream->priv;
   char *range;
 
   if (type == G_SEEK_END)
@@ -862,8 +446,7 @@ soup_input_stream_seek (GSeekable     *seekable,
   if (!g_input_stream_set_pending (stream, error))
       return FALSE;
 
-  soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED);
-  soup_message_io_cleanup (priv->msg);
+  _soup_input_stream_cancel_msg_and_wait (istream);
 
   switch (type)
     {
@@ -889,7 +472,7 @@ soup_input_stream_seek (GSeekable     *seekable,
   soup_message_headers_append (priv->msg->request_headers, "Range", range);
   g_free (range);
 
-  soup_input_stream_queue_message (SOUP_INPUT_STREAM (stream));
+  _soup_input_stream_queue_msg_and_wait (istream);
 
   g_input_stream_clear_pending (stream);
   return TRUE;
@@ -922,8 +505,8 @@ soup_input_stream_get_message (GInputStream *stream)
 GQuark
 soup_http_error_quark (void)
 {
-	static GQuark error;
-	if (!error)
-		error = g_quark_from_static_string ("soup_http_error_quark");
-	return error;
+  static GQuark error;
+  if (!error)
+    error = g_quark_from_static_string ("soup_http_error_quark");
+  return error;
 }
diff --git a/daemon/soup-input-stream.h b/daemon/soup-input-stream.h
index f425291..2d47c7c 100644
--- a/daemon/soup-input-stream.h
+++ b/daemon/soup-input-stream.h
@@ -33,17 +33,22 @@ G_BEGIN_DECLS
 
 typedef struct SoupInputStream         SoupInputStream;
 typedef struct SoupInputStreamClass    SoupInputStreamClass;
+typedef struct SoupInputStreamPrivate  SoupInputStreamPrivate;
 
 struct SoupInputStream
 {
   GInputStream parent;
 
+  SoupInputStreamPrivate *priv;
+  
 };
 
 struct SoupInputStreamClass
 {
   GInputStreamClass parent_class;
 
+
+
   /* Padding for future expansion */
   void (*_g_reserved1) (void);
   void (*_g_reserved2) (void);
@@ -61,15 +66,6 @@ gboolean      soup_input_stream_send        (GInputStream        *stream,
 					     GCancellable        *cancellable,
 					     GError             **error);
 
-void          soup_input_stream_send_async  (GInputStream        *stream,
-					     int                  io_priority,
-					     GCancellable        *cancellable,
-					     GAsyncReadyCallback  callback,
-					     gpointer             user_data);
-gboolean      soup_input_stream_send_finish (GInputStream        *stream,
-					     GAsyncResult        *result,
-					     GError             **error);
-
 SoupMessage  *soup_input_stream_get_message (GInputStream         *stream);
 
 #define SOUP_HTTP_ERROR soup_http_error_quark()
openSUSE Build Service is sponsored by