Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:dreveman:openSUSE:11.1:cr
cr
cr-unix-domain-sockets.diff
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File cr-unix-domain-sockets.diff of Package cr
commit e8198be2f4cac87dcae8324c66ce07366f08814b Author: David Reveman <davidr@novell.com> Date: Tue Mar 31 16:45:43 2009 -0400 Unix Domain Socket support. diff --git a/include/cr_net.h b/include/cr_net.h index 32b0222..3ae3210 100644 --- a/include/cr_net.h +++ b/include/cr_net.h @@ -42,6 +42,7 @@ typedef struct CRConnection CRConnection; typedef enum { CR_NO_CONNECTION, + CR_UNIX, CR_SDP, CR_TCPIP, CR_UDPTCPIP, @@ -170,6 +171,9 @@ struct CRConnection { CRSocket sdp_socket; + /* UNIX */ + CRSocket unix_socket; + /* UDP/IP */ CRSocket udp_socket; #ifndef ADDRINFO diff --git a/mothership/server/mothership.py b/mothership/server/mothership.py index 010700c..7fb003c 100644 --- a/mothership/server/mothership.py +++ b/mothership/server/mothership.py @@ -271,7 +271,7 @@ class SPU: Associates a server with an SPU and tells it how to connect to it. The SPU will typically be a pack SPU or tilesort SPU. """ - if (protocol.startswith('file') or protocol.startswith('swapfile')): + if (protocol.startswith('file') or protocol.startswith('swapfile') or protocol.startswith('unix')): self.__add_server( node, "%s" % protocol ) # Don't tell the server "node" about this. else: @@ -616,6 +616,8 @@ class SockWrapper: self.teac_connect_wait = [] self.tcscomm_accept_wait = [] self.tcscomm_connect_wait = [] + self.unix_accept_wait = None + self.unix_connect_wait = None def readline( self ): return string.strip(self.file.readline()) @@ -954,6 +956,23 @@ class CR: self.wrappers[self.mother.sock] = self.mother else: CRInfo("This is Chromium, Version " + Version) + + try: + us = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + os.remove("/tmp/.CR-unix/C%u" % os.getpid()) + except OSError: + pass + if not os.path.exists("/tmp/.CR-unix"): + mask = os.umask(0) + os.makedirs("/tmp/.CR-unix") + os.umask(mask) + us.bind("/tmp/.CR-unix/C%u" % os.getpid()) + us.listen(100) + self.all_sockets.append(us) + except: + Fatal( "Couldn't create local Unix Domain Socket") + try: if PORT == -1: # Port was not specified. Get it from @@ -1052,9 +1071,9 @@ class CR: ready = select.select( self.all_sockets, [], [], 0.1 )[0] for sock in ready: - if sock == s: + if sock == s or sock == us: # accept a new connection - (conn, addr) = s.accept() + (conn, addr) = sock.accept() self.wrappers[conn] = SockWrapper(conn) self.all_sockets.append( conn ) else: @@ -1235,6 +1254,26 @@ class CR: sock.tcscomm_connect_wait.append( (my_hostname, my_rank, my_endianness, remote_hostname, remote_rank) ) return + def ConnectUNIX( self, sock, connect_info ): + """Connect routine for Unix Domain Sockets (see do_connectrequest())""" + (p, filename, port_str, endianness_str) = connect_info + endianness = int(endianness_str) + for server_sock in self.wrappers.values(): + if server_sock.unix_accept_wait != None: + (server_filename, server_endianness) = server_sock.unix_accept_wait + if server_filename == filename: + sock.Success("%d %d" % (self.conn_id, server_endianness)) + server_sock.Success( "%d" % self.conn_id ) + # we don't want to re-use this info!! + server_sock.unix_accept_wait = None + self.conn_id += 1 + return + else: + CRDebug( "not connecting to \"%s\" (!= \"%s\")" + % (server_filename, filename) ) + sock.unix_connect_wait = (filename, endianness) + return + def do_connectrequest( self, sock, args ): """ This function is called when the mothership receives a "connectrequest" @@ -1257,6 +1296,8 @@ class CR: self.ConnectQuadrics(sock, connect_info) elif protocol == 'quadrics-tcscomm': self.ConnectTcscomm(sock, connect_info) + elif protocol == 'unix': + self.ConnectUNIX(sock, connect_info) else: sock.Failure(SockWrapper.UNKNOWNPROTOCOL, "Never heard of protocol %s" % protocol) @@ -1405,6 +1446,27 @@ class CR: sock.tcscomm_accept_wait.append( (hostname, rank, endianness) ) return + def AcceptUNIX( self, sock, accept_info ): + """Accept routine for Unix Domain Sockets (see do_acceptrequest())""" + (p, filename, port_str, endianness_str) = accept_info + endianness = int(endianness_str) + for client_sock in self.wrappers.values(): + if client_sock.unix_connect_wait != None: + (client_filename, client_endianness) = client_sock.unix_connect_wait + if client_filename == filename: + sock.Success( "%d" % self.conn_id ) + client_sock.Success("%d %d" % (self.conn_id, endianness)) + # we don't want to re-use this info!! + client_sock.unix_connect_wait = None + self.conn_id += 1 + return + else: + CRDebug( "not accepting from \"%s\" (!= \"%s\")" % (client_filename, filename ) ) + else: + CRDebug( "unix_connect_wait" ) + sock.unix_accept_wait = (filename, endianness) + return + def do_acceptrequest( self, sock, args ): """ This function is called when the mothership receives a "acceptrequest" @@ -1427,6 +1489,8 @@ class CR: self.AcceptQuadrics(sock, accept_info) elif protocol == 'quadrics-tcscomm': self.AcceptTcscomm(sock, accept_info) + elif protocol == 'unix': + self.AcceptUNIX(sock, accept_info) else: sock.Failure(SockWrapper.UNKNOWNPROTOCOL, "Never heard of protocol %s" % protocol) @@ -1956,7 +2020,10 @@ class CR: for i in allSPUs.keys(): spu = allSPUs[i] if spu.name == "replicate": - sock.Success("1 tcpip %d" % spu.ID); + if args[0] == '/': + sock.Success("1 unix://%s %d" % (args, spu.ID)); + else: + sock.Success("1 tcpip %d" % spu.ID); return sock.Failure(SockWrapper.NOTHINGTOSAY, "getvncclient: Didn't find VNC ApplicationNode and SPU") diff --git a/options.mk b/options.mk index e2c112e..ba84781 100644 --- a/options.mk +++ b/options.mk @@ -65,6 +65,9 @@ IB_SUPPORT=0 # Set to 1 to enable SDP Support SDP_SUPPORT=0 +# Set to 0 to disable UNIX domain socket support +UNIX_SUPPORT=1 + # Set to 1 to enable DOXYGEN generation DOXYGEN=0 diff --git a/util/Makefile b/util/Makefile index 3ddaf5f..1bf797a 100644 --- a/util/Makefile +++ b/util/Makefile @@ -105,6 +105,11 @@ ifeq ($(SDP_SUPPORT), 1) FILES += sdp endif +ifeq ($(UNIX_SUPPORT), 1) +CFLAGS += -DUNIX_SUPPORT +FILES += unix +endif + include ${TOP}/cr.mk debug_opcodes.c: debug_opcodes.py $(TOP)/glapi_parser/APIspec.txt diff --git a/util/net.c b/util/net.c index ccf6b81..212ea6e 100644 --- a/util/net.c +++ b/util/net.c @@ -54,6 +54,7 @@ static struct { int use_sdp; int use_teac; int use_tcscomm; + int use_unix; int num_clients; /* total number of clients (unused?) */ @@ -154,6 +155,15 @@ InitConnection(CRConnection *conn, const char *protocol, unsigned int mtu) crHPMCConnection(conn); } #endif +#ifdef UNIX_SUPPORT + else if (!crStrcmp(protocol, "unix")) + { + cr_net.use_unix++; + crUNIXInit(cr_net.recv_list, cr_net.close_list, mtu); + crUNIXConnection(conn); + } +#endif + else { crError("Unknown protocol: \"%s\"", protocol); @@ -339,7 +349,8 @@ crNetAcceptClient( const char *protocol, const char *hostname, /* special case */ if ( !crStrncmp( protocol, "file", crStrlen( "file" ) ) || - !crStrncmp( protocol, "swapfile", crStrlen( "swapfile" ) ) ) + !crStrncmp( protocol, "swapfile", crStrlen( "swapfile" ) ) || + !crStrncmp( protocol, "unix", crStrlen( "unix" ) )) { char filename[4096]; char protocol_only[4096]; @@ -487,6 +498,10 @@ CRConnection** crNetDump( int* num ) c = crSDPDump( num ); if ( c ) return c; #endif +#ifdef UNIX_SUPPORT + c = crUNIXDump( num ); + if ( c ) return c; +#endif *num = 0; return NULL; @@ -807,15 +822,16 @@ void crNetAccept( CRConnection *conn, const char *hostname, unsigned short port /** * Do a blocking receive on a particular connection. This only - * really works for TCPIP, but it's really only used (right now) by + * really works for TCPIP and UNIX, but it's really only used (right now) by * the mothership client library. * Read exactly the number of bytes specified (no headers/prefixes). */ void crNetSingleRecv( CRConnection *conn, void *buf, unsigned int len ) { - if (conn->type != CR_TCPIP) + if (conn->type != CR_TCPIP && + conn->type != CR_UNIX) { - crError( "Can't do a crNetSingleReceive on anything other than TCPIP." ); + crError( "Can't do a crNetSingleReceive on anything other than TCPIP or UNIX." ); } conn->Recv( conn, buf, len ); } @@ -1137,9 +1153,10 @@ void crNetReadline( CRConnection *conn, void *buf ) if (!conn || conn->type == CR_NO_CONNECTION) return; - if (conn->type != CR_TCPIP) + if (conn->type != CR_TCPIP && + conn->type != CR_UNIX) { - crError( "Can't do a crNetReadline on anything other than TCPIP (%d).",conn->type ); + crError( "Can't do a crNetReadline on anything other than TCPIP or UNIX (%d).",conn->type ); } temp = (char*)buf; for (;;) @@ -1195,6 +1212,12 @@ int crNetRecv( void ) found_work += crTcscommRecv(); #endif +#ifdef UNIX_SUPPORT + if ( cr_net.use_unix ) + found_work += crUNIXRecv(); +#endif + + return found_work; } diff --git a/util/net_internals.h b/util/net_internals.h index cbf4fd5..ad04a27 100644 --- a/util/net_internals.h +++ b/util/net_internals.h @@ -169,6 +169,16 @@ extern unsigned int crGmNodeId( void ); extern unsigned int crGmPortNum( void ); #endif /* GM_SUPPORT */ +/* + * UNIX network interface + */ +#ifdef UNIX_SUPPORT +extern void crUNIXInit( CRNetReceiveFuncList *rfl, CRNetCloseFuncList *cfl, unsigned int mtu ); +extern void crUNIXConnection( CRConnection *conn ); +extern int crUNIXRecv( void ); +extern CRConnection** crUNIXDump( int *num ); +#endif /* UNIX_SUPPORT */ + extern CRConnection** crNetDump( int *num ); diff --git a/util/unix.c b/util/unix.c new file mode 100644 index 0000000..bba01f6 --- /dev/null +++ b/util/unix.c @@ -0,0 +1,972 @@ +/* Copyright (c) 2001, Stanford University + * All rights reserved + * + * See the file LICENSE.txt for information on redistributing this software. + */ + +#include <sys/types.h> +#include <sys/wait.h> +#ifdef OSF1 +typedef int socklen_t; +#endif +#include <sys/un.h> +#include <sys/socket.h> +#include <sys/time.h> +#include <sys/stat.h> +#include <netdb.h> +#include <unistd.h> + +#include <limits.h> +#include <stdlib.h> +#include <stdio.h> +#include <errno.h> +#include <signal.h> +#include <string.h> +#ifdef AIX +#include <strings.h> +#endif + +#ifdef LINUX +#include <sys/ioctl.h> +#endif + +#include "cr_error.h" +#include "cr_mem.h" +#include "cr_string.h" +#include "cr_bufpool.h" +#include "cr_net.h" +#include "cr_endian.h" +#include "cr_threads.h" +#include "cr_environment.h" +#include "net_internals.h" + +/* + * UNIX network interface + */ +typedef enum { + CRUNIXMemory, + CRUNIXMemoryBig +} CRUNIXBufferKind; + +#define CR_UNIX_BUFFER_MAGIC 0x89134532 + +typedef struct CRUNIXBuffer { + unsigned int magic; + CRUNIXBufferKind kind; + unsigned int len; + unsigned int allocated; + unsigned int pad; /* may be clobbered by crUNIXSend() */ +} CRUNIXBuffer; + +typedef struct { + int initialized; + int num_conns; + CRConnection **conns; + CRBufferPool *bufpool; +#ifdef CHROMIUM_THREADSAFE + CRmutex mutex; + CRmutex recvmutex; +#endif + CRNetReceiveFuncList *recv_list; + CRNetCloseFuncList *close_list; + CRSocket server_sock; +} cr_unix_data; + +/* XXX these could be removed by reordering the functions below */ +static void +__unix_dead_connection( CRConnection *conn ); + +static void +crUNIXDoDisconnect( CRConnection *conn ); + + +static int crUNIXErrno( void ) +{ + int err = errno; + errno = 0; + return err; +} + +static char *crUNIXErrorString( int err ) +{ + static char buf[512], *temp; + + temp = strerror( err ); + if ( temp ) + { + crStrncpy( buf, temp, sizeof(buf)-1 ); + buf[sizeof(buf)-1] = 0; + } + else + { + sprintf( buf, "err=%d", err ); + } + + return buf; +} + +cr_unix_data cr_unix; + +/** + * Read len bytes from socket, and store in buffer. + * \return 1 if success, -1 if error, 0 if sender exited. + */ +static int +__unix_read_exact( CRSocket sock, void *buf, unsigned int len ) +{ + char *dst = (char *) buf; + /* + * Shouldn't write to a non-existent socket, ie when + * crUNIXDoDisconnect has removed it from the pool + */ + if ( sock <= 0 ) + return 1; + + while (len > 0) { + const int num_read = recv( sock, dst, (int) len, 0 ); + if (num_read < 0) { + const int error = crUNIXErrno(); + switch (error) { + case EINTR: + crWarning("__unix_read_exact() got EINTR, looping"); + continue; + case EAGAIN: + continue; + case EFAULT: + /* fallthrough */ + case EINVAL: + /* fallthrough */ + default: + crWarning("__unix_read_exact() error: %s", crUNIXErrorString(error)); + return -1; + } + } + else if (num_read == 0) { + /* client exited gracefully */ + return 0; + } + + dst += num_read; + len -= num_read; + } + + return 1; +} + +static void +crUNIXReadExact( CRConnection *conn, void *buf, unsigned int len ) +{ + if ( __unix_read_exact( conn->unix_socket, buf, len ) <= 0 ) + { + __unix_dead_connection( conn ); + } +} + +/** + * Write the given buffer of len bytes on the socket. + * \return 1 if OK, negative value if error. + */ +static int +__unix_write_exact( CRSocket sock, const void *buf, unsigned int len ) +{ + const char *src = (const char *) buf; + + /* + * Shouldn't write to a non-existent socket, ie when + * crUNIXDoDisconnect has removed it from the pool + */ + if ( sock <= 0 ) + return 1; + + while ( len > 0 ) + { + const int num_written = send( sock, src, len, 0 ); + if ( num_written <= 0 ) + { + int err; + if ( (err = crUNIXErrno( )) == EINTR ) + { + crWarning("__unix_write_exact(UNIX): caught an EINTR, continuing"); + continue; + } + + return -err; + } + + len -= num_written; + src += num_written; + } + + return 1; +} + +static void +crUNIXWriteExact( CRConnection *conn, const void *buf, unsigned int len ) +{ + if ( __unix_write_exact( conn->unix_socket, buf, len) <= 0 ) + { + __unix_dead_connection( conn ); + } +} + +#if defined( IRIX ) || defined( IRIX64 ) +typedef int socklen_t; +#endif + + +/** + * Create a listening socket using the given port. + * Caller can then pass the socket to accept(). + * If the port is one that's been seen before, we'll reuse/return the + * previously create socket. + */ +static int +CreateListeningSocket(int port) +{ + static int id = 0; + static int sock = 0; + + if (sock) + { + if (port != id) + crError("Fatal error in unix layer: only one listening socket allowed"); + + return sock; + } + + id = port; + + /* new port so create new socket */ + { + int err; + struct sockaddr_un servaddr; + + /* with the new OOB stuff, we can have multiple ports being + * accepted on, so we need to redo the server socket every time. + */ + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if ( sock == -1 ) + { + err = crUNIXErrno( ); + crError( "Couldn't create socket: %s", crUNIXErrorString( err ) ); + } + + servaddr.sun_family = AF_UNIX; + snprintf(servaddr.sun_path, + sizeof (servaddr.sun_path), "/tmp/.CR-unix/C%u", + port); + unlink(servaddr.sun_path); + if ( (access( "/tmp/.CR-unix", F_OK )) == -1 ) + { + int mask; + + mask = umask (0); + mkdir ("/tmp/.CR-unix", 0777); + umask (mask); + } + if ( bind( sock, (struct sockaddr *) &servaddr, + strlen(servaddr.sun_path) + sizeof(servaddr.sun_family) ) ) + { + err = crUNIXErrno( ); + crError( "Couldn't bind to socket (port=%d): %s", + port, crUNIXErrorString( err ) ); + } + + if ( listen( sock, 100 /* max pending connections */ ) ) + { + err = crUNIXErrno( ); + crError( "Couldn't listen on socket: %s", crUNIXErrorString( err ) ); + } + } + + return sock; +} + + + + +static void +crUNIXAccept( CRConnection *conn, const char *hostname, unsigned short port ) +{ + int err; + socklen_t addr_length; + struct sockaddr_un addr; + + cr_unix.server_sock = CreateListeningSocket(getpid()); + + /* If brokered, we'll contact the mothership to broker the network + * connection. We'll send the mothership our hostname, the port and + * our endianness and will get in return a connection ID number. + */ + if (conn->broker) { + CRConnection *mother; + char response[8096]; + + mother = __copy_of_crMothershipConnect( ); + + /* We'll block on this call until the corresponding client-side + * connectrequest is received by the mothership. + */ + if (!__copy_of_crMothershipSendString( mother, response, "acceptrequest unix %s %d %d", conn->filename, 0, conn->endianness ) ) + { + crError( "Mothership didn't like my accept request" ); + } + + __copy_of_crMothershipDisconnect( mother ); + + sscanf( response, "%u", &(conn->id) ); + } + + addr_length = sizeof( addr ); + conn->unix_socket = accept( cr_unix.server_sock, (struct sockaddr *) &addr, &addr_length ); + if (conn->unix_socket == -1) + { + err = crUNIXErrno( ); + crError( "Couldn't accept client: %s", crUNIXErrorString( err ) ); + } + + crDebug( "Accepted connection from \"%s\".", conn->hostname ); +} + + +static void * +crUNIXAlloc( CRConnection *conn ) +{ + CRUNIXBuffer *buf; + +#ifdef CHROMIUM_THREADSAFE + crLockMutex(&cr_unix.mutex); +#endif + + buf = (CRUNIXBuffer *) crBufferPoolPop( cr_unix.bufpool, conn->buffer_size ); + + if ( buf == NULL ) + { + crDebug("Buffer pool %p was empty; allocated new %d byte buffer.", + cr_unix.bufpool, + (unsigned int)sizeof(CRUNIXBuffer) + conn->buffer_size); + buf = (CRUNIXBuffer *) + crAlloc( sizeof(CRUNIXBuffer) + conn->buffer_size ); + buf->magic = CR_UNIX_BUFFER_MAGIC; + buf->kind = CRUNIXMemory; + buf->pad = 0; + buf->allocated = conn->buffer_size; + } + +#ifdef CHROMIUM_THREADSAFE + crUnlockMutex(&cr_unix.mutex); +#endif + + return (void *)( buf + 1 ); +} + + +static void +crUNIXSingleRecv( CRConnection *conn, void *buf, unsigned int len ) +{ + crUNIXReadExact( conn, buf, len ); +} + + +static void +crUNIXSend( CRConnection *conn, void **bufp, + const void *start, unsigned int len ) +{ + if ( !conn || conn->type == CR_NO_CONNECTION ) + return; + + if (!bufp) { + /* We're sending a user-allocated buffer. + * Simply write the length & the payload and return. + */ + const int sendable_len = conn->swap ? SWAP32(len) : len; + crUNIXWriteExact( conn, &sendable_len, sizeof(len) ); + if (!conn || conn->type == CR_NO_CONNECTION) + return; + crUNIXWriteExact( conn, start, len ); + } + else { + /* The region [start .. start + len + 1] lies within a buffer that + * was allocated with crUNIXAlloc() and can be put into the free + * buffer pool when we're done sending it. + */ + CRUNIXBuffer *unix_buffer; + unsigned int *lenp; + + unix_buffer = (CRUNIXBuffer *)(*bufp) - 1; + + CRASSERT( unix_buffer->magic == CR_UNIX_BUFFER_MAGIC ); + + /* All of the buffers passed to the send function were allocated + * with crUNIXAlloc(), which includes a header with a 4 byte + * pad field, to insure that we always have a place to write + * the length field, even when start == *bufp. + */ + lenp = (unsigned int *) start - 1; + *lenp = conn->swap ? SWAP32(len) : len; + + crUNIXWriteExact(conn, lenp, len + sizeof(unsigned int)); + + /* Reclaim this pointer for reuse */ +#ifdef CHROMIUM_THREADSAFE + crLockMutex(&cr_unix.mutex); +#endif + crBufferPoolPush(cr_unix.bufpool, unix_buffer, unix_buffer->allocated); +#ifdef CHROMIUM_THREADSAFE + crUnlockMutex(&cr_unix.mutex); +#endif + /* Since the buffer's now in the 'free' buffer pool, the caller can't + * use it any more. Setting bufp to NULL will make sure the caller + * doesn't try to re-use the buffer. + */ + *bufp = NULL; + } +} + + +static void +__unix_dead_connection( CRConnection *conn ) +{ + crDebug("Dead unix domain sockket connection (sock=%d, host=%s)", + conn->unix_socket, conn->hostname); + /* remove from connection pool */ + crUNIXDoDisconnect( conn ); +} + + +static void +crUNIXFree( CRConnection *conn, void *buf ) +{ + CRUNIXBuffer *unix_buffer = (CRUNIXBuffer *) buf - 1; + + CRASSERT( unix_buffer->magic == CR_UNIX_BUFFER_MAGIC ); + conn->recv_credits += unix_buffer->len; + + switch ( unix_buffer->kind ) + { + case CRUNIXMemory: +#ifdef CHROMIUM_THREADSAFE + crLockMutex(&cr_unix.mutex); +#endif + if (cr_unix.bufpool) { + /* pool may have been deallocated just a bit earlier in response + * to a SIGPIPE (Broken Pipe) signal. + */ + crBufferPoolPush( cr_unix.bufpool, unix_buffer, unix_buffer->allocated ); + } +#ifdef CHROMIUM_THREADSAFE + crUnlockMutex(&cr_unix.mutex); +#endif + break; + + case CRUNIXMemoryBig: + crFree( unix_buffer ); + break; + + default: + crError( "Weird buffer kind trying to free in crUNIXFree: %d", unix_buffer->kind ); + } +} + + +/** + * Check if message type is GATHER. If so, process it specially. + * \return number of bytes which were consumed + */ +static int +crUNIXUserbufRecv(CRConnection *conn, CRMessage *msg) +{ + if (msg->header.type == CR_MESSAGE_GATHER) { + /* grab the offset and the length */ + const int len = 2 * sizeof(unsigned int); /* was unsigned long!!!! */ + unsigned int buf[2]; + + if (__unix_read_exact(conn->unix_socket, buf, len) <= 0) + { + __unix_dead_connection( conn ); + } + msg->gather.offset = buf[0]; + msg->gather.len = buf[1]; + + /* read the rest into the userbuf */ + if (buf[0] + buf[1] > (unsigned int) conn->userbuf_len) + { + crDebug("userbuf for Gather Message is too small!"); + return len; + } + + if (__unix_read_exact(conn->unix_socket, + conn->userbuf + buf[0], buf[1]) <= 0) + { + __unix_dead_connection( conn ); + } + return len + buf[1]; + } + else { + return 0; + } +} + + +/** + * Receive the next message on the given connection. + * If we're being called by crUNIXRecv(), we already know there's + * something to receive. + */ +static void +crUNIXReceiveMessage(CRConnection *conn) +{ + CRMessage *msg; + CRMessageType cached_type; + CRUNIXBuffer *unix_buffer; + unsigned int len, total, leftover; + const int sock = conn->unix_socket; + + if (conn->type == CR_NO_CONNECTION || !sock) { + /* this might happen during app shut-down */ + return; + } + + /* this reads the length of the message */ + if ( __unix_read_exact( sock, &len, sizeof(len)) <= 0 ) + { + __unix_dead_connection( conn ); + return; + } + + if (conn->swap) + len = SWAP32(len); + + CRASSERT( len > 0 ); + + if ( len <= conn->buffer_size ) + { + /* put in pre-allocated buffer */ + unix_buffer = (CRUNIXBuffer *) crUNIXAlloc( conn ) - 1; + } + else + { + /* allocate new buffer */ + unix_buffer = (CRUNIXBuffer *) crAlloc( sizeof(*unix_buffer) + len ); + unix_buffer->magic = CR_UNIX_BUFFER_MAGIC; + unix_buffer->kind = CRUNIXMemoryBig; + unix_buffer->pad = 0; + } + + unix_buffer->len = len; + + /* if we have set a userbuf, and there is room in it, we probably + * want to stick the message into that, instead of our allocated + * buffer. + */ + leftover = 0; + total = len; + if ((conn->userbuf != NULL) + && (conn->userbuf_len >= (int) sizeof(CRMessageHeader))) + { + leftover = len - sizeof(CRMessageHeader); + total = sizeof(CRMessageHeader); + } + + if ( __unix_read_exact( sock, unix_buffer + 1, total) <= 0 ) + { + crWarning( "Bad juju: %d %d on socket 0x%x", unix_buffer->allocated, + total, sock ); + crFree( unix_buffer ); + __unix_dead_connection( conn ); + return; + } + + conn->recv_credits -= total; + conn->total_bytes_recv += total; + + msg = (CRMessage *) (unix_buffer + 1); + cached_type = msg->header.type; + if (conn->swap) + { + msg->header.type = (CRMessageType) SWAP32( msg->header.type ); + msg->header.conn_id = (CRMessageType) SWAP32( msg->header.conn_id ); + } + + /* if there is still data pending, it should go into the user buffer */ + if (leftover) + { + const unsigned int handled = crUNIXUserbufRecv(conn, msg); + + /* if there is anything left, plop it into the recv_buffer */ + if (leftover - handled) + { + if ( __unix_read_exact( sock, unix_buffer + 1 + total, leftover-handled) <= 0 ) + { + crWarning( "Bad juju: %d %d", unix_buffer->allocated, leftover-handled); + crFree( unix_buffer ); + __unix_dead_connection( conn ); + return; + } + } + + conn->recv_credits -= handled; + conn->total_bytes_recv += handled; + } + + crNetDispatchMessage( cr_unix.recv_list, conn, msg, len ); +#if 0 + crLogRead( len ); +#endif + + /* CR_MESSAGE_OPCODES is freed in crserverlib/server_stream.c with crNetFree. + * OOB messages are the programmer's problem. -- Humper 12/17/01 + */ + if (cached_type != CR_MESSAGE_OPCODES + && cached_type != CR_MESSAGE_OOB + && cached_type != CR_MESSAGE_GATHER) + { + crUNIXFree( conn, unix_buffer + 1 ); + } +} + + +/** + * Loop over all connections, reading incoming data on those + * that are ready. + */ +int +crUNIXRecv( void ) +{ + /* ensure we don't get caught with a new thread connecting */ + const int num_conns = cr_unix.num_conns; + int num_ready, max_fd, i; + fd_set read_fds; + int msock = -1; /* assumed mothership socket */ + +#ifdef CHROMIUM_THREADSAFE + crLockMutex(&cr_unix.recvmutex); +#endif + + /* + * Loop over all connections and determine which are connections + * that are ready to be read. + */ + max_fd = 0; + FD_ZERO( &read_fds ); + for ( i = 0; i < num_conns; i++ ) + { + CRConnection *conn = cr_unix.conns[i]; + if ( !conn || conn->type == CR_NO_CONNECTION ) + continue; + + if ( conn->recv_credits > 0 || conn->type != CR_UNIX ) + { + /* + * NOTE: may want to always put the FD in the descriptor + * set so we'll notice broken connections. Down in the + * loop that iterates over the ready sockets only peek + * (MSG_PEEK flag to recv()?) if the connection isn't + * enabled. + */ + fd_set only_fd; /* testing single fd */ + CRSocket sock = conn->unix_socket; + + if ( (int) sock + 1 > max_fd ) + max_fd = (int) sock + 1; + FD_SET( sock, &read_fds ); + + /* KLUDGE CITY...... + * + * With threads there's a race condition between + * UNIXRecv and UNIXSingleRecv when new + * clients are connecting, thus new mothership + * connections are also being established. + * This code below is to check that we're not + * in a state of accepting the socket without + * connecting to it otherwise we fail with + * ENOTCONN later. But, this is really a side + * effect of this routine catching a motherships + * socket connection and reading data that wasn't + * really meant for us. It was really meant for + * UNIXSingleRecv. So, if we detect an + * in-progress connection we set the msock id + * so that we can assume the motherships socket + * and skip over them. + */ + + FD_ZERO(&only_fd); + FD_SET( sock, &only_fd ); + + /* + * Nope, that last socket we've just caught in + * the connecting phase. We've probably found + * a mothership connection here, and we shouldn't + * process it + */ + if ((int)sock == msock+1) + FD_CLR(sock, &read_fds); + } + } + + if (!max_fd) { +#ifdef CHROMIUM_THREADSAFE + crUnlockMutex(&cr_unix.recvmutex); +#endif + return 0; + } + + if ( num_conns ) { + num_ready = __crSelect( max_fd, &read_fds, 0, 500 ); + } + else { + crWarning( "Waiting for first connection..." ); + num_ready = __crSelect( max_fd, &read_fds, 0, 0 ); + } + + if ( num_ready == 0 ) { +#ifdef CHROMIUM_THREADSAFE + crUnlockMutex(&cr_unix.recvmutex); +#endif + return 0; + } + + /* + * Loop over connections, receive data on the connections that + * we determined are ready above. + */ + for ( i = 0; i < num_conns; i++ ) + { + CRConnection *conn = cr_unix.conns[i]; + CRSocket sock; + + if ( !conn || conn->type == CR_NO_CONNECTION ) + continue; + + /* Added by Samuel Thibault during TCP/IP / UDP code factorization */ + if ( conn->type != CR_UNIX ) + continue; + + sock = conn->unix_socket; + if ( !FD_ISSET( sock, &read_fds ) ) + continue; + + if (conn->threaded) + continue; + + crUNIXReceiveMessage(conn); + } + +#ifdef CHROMIUM_THREADSAFE + crUnlockMutex(&cr_unix.recvmutex); +#endif + + return 1; +} + + +static void +crUNIXHandleNewMessage( CRConnection *conn, CRMessage *msg, unsigned int len ) +{ + CRUNIXBuffer *buf = ((CRUNIXBuffer *) msg) - 1; + + /* build a header so we can delete the message later */ + buf->magic = CR_UNIX_BUFFER_MAGIC; + buf->kind = CRUNIXMemory; + buf->len = len; + buf->pad = 0; + + crNetDispatchMessage( cr_unix.recv_list, conn, msg, len ); +} + + +static void +crUNIXInstantReclaim( CRConnection *conn, CRMessage *mess ) +{ + crUNIXFree( conn, mess ); +} + + +void +crUNIXInit( CRNetReceiveFuncList *rfl, CRNetCloseFuncList *cfl, + unsigned int mtu ) +{ + (void) mtu; + + cr_unix.recv_list = rfl; + cr_unix.close_list = cfl; + if ( cr_unix.initialized ) + { + return; + } + + cr_unix.initialized = 1; + + cr_unix.num_conns = 0; + cr_unix.conns = NULL; + + cr_unix.server_sock = -1; + +#ifdef CHROMIUM_THREADSAFE + crInitMutex(&cr_unix.mutex); + crInitMutex(&cr_unix.recvmutex); +#endif + cr_unix.bufpool = crBufferPoolInit(16); +} + + +/** + * The function that actually connects. This should only be called by clients + * Servers have another way to set up the socket. + */ +static int +crUNIXDoConnect( CRConnection *conn ) +{ + int err; + struct sockaddr_un servaddr; + + conn->unix_socket = socket( AF_UNIX, SOCK_STREAM, 0 ); + if ( conn->unix_socket < 0 ) + { + int err = crUNIXErrno( ); + crWarning( "socket error: %s", crUNIXErrorString( err ) ); + cr_unix.conns[conn->index] = NULL; /* remove from table */ + return 0; + } + + crStrncpy(servaddr.sun_path, conn->filename, + sizeof (servaddr.sun_path)); + servaddr.sun_family = AF_UNIX; + + /* If brokered, we'll contact the mothership to broker the network + * connection. We'll send the mothership our hostname, the port and + * our endianness and will get in return a connection ID number. + */ + if (conn->broker) + { + CRConnection *mother; + char response[8096]; + int remote_endianness; + mother = __copy_of_crMothershipConnect( ); + + /* We'll block on this call until the corresponding server-side + * acceptrequest is received by the mothership. + */ + if (!__copy_of_crMothershipSendString( mother, response, "connectrequest unix %s %d %d", + conn->filename, 0, conn->endianness) ) + { + crError( "Mothership didn't like my connect request" ); + } + + __copy_of_crMothershipDisconnect( mother ); + + sscanf( response, "%u %d", &(conn->id), &(remote_endianness) ); + + if (conn->endianness != remote_endianness) { + conn->swap = 1; + } + } + + if ( !connect( conn->unix_socket, (struct sockaddr *) &servaddr, + sizeof(servaddr) ) ) + return 1; + + err = crUNIXErrno( ); + if ( err == EADDRINUSE || err == ECONNREFUSED ) + crWarning( "Connection refused to %s:%d, %s", + conn->hostname, conn->port, crUNIXErrorString( err ) ); + + else if ( err == EINTR ) + { + crWarning( "connection to %s:%d " + "interruped, trying again", conn->hostname, conn->port ); + } + else + crWarning( "Couldn't connect to %s:%d, %s", + conn->hostname, conn->port, crUNIXErrorString( err ) ); + crCloseSocket( conn->unix_socket ); + cr_unix.conns[conn->index] = NULL; /* remove from table */ + return 0; +} + + +/** + * Disconnect this connection, but don't free(conn). + */ +static void +crUNIXDoDisconnect( CRConnection *conn ) +{ + /* If this connection has already been disconnected (e.g. + * if the connection has been lost and disabled through + * a call to __unix_dead_connection(), which will then + * call this routine), don't disconnect it again; if we + * do, and if a new valid connection appears in the same + * slot (conn->index), we'll effectively disable the + * valid connection by mistake, leaving us unable to + * receive inbound data on that connection. + */ + if (conn->type == CR_NO_CONNECTION) + return; + + crCloseSocket( conn->unix_socket ); + if (conn->hostname) { + crFree(conn->hostname); + conn->hostname = NULL; + } + conn->unix_socket = 0; + conn->type = CR_NO_CONNECTION; + cr_unix.conns[conn->index] = NULL; +} + + +/** + * Initialize a CRConnection. This is called via the + * InitConnection() function (and from the UDP module). + */ +void +crUNIXConnection( CRConnection *conn ) +{ + int i, found = 0; + int n_bytes; + + CRASSERT( cr_unix.initialized ); + + conn->type = CR_UNIX; + conn->Alloc = crUNIXAlloc; + conn->Send = crUNIXSend; + conn->SendExact = crUNIXWriteExact; + conn->Recv = crUNIXSingleRecv; + conn->RecvMsg = crUNIXReceiveMessage; + conn->Free = crUNIXFree; + conn->Accept = crUNIXAccept; + conn->Connect = crUNIXDoConnect; + conn->Disconnect = crUNIXDoDisconnect; + conn->InstantReclaim = crUNIXInstantReclaim; + conn->HandleNewMessage = crUNIXHandleNewMessage; + conn->index = cr_unix.num_conns; + conn->sizeof_buffer_header = sizeof( CRUNIXBuffer ); + conn->actual_network = 1; + + conn->filename = crStrdup( conn->hostname ); + + conn->krecv_buf_size = 0; + + /* Find a free slot */ + for (i = 0; i < cr_unix.num_conns; i++) { + if (cr_unix.conns[i] == NULL) { + conn->index = i; + cr_unix.conns[i] = conn; + found = 1; + break; + } + } + + /* Realloc connection stack if we couldn't find a free slot */ + if (found == 0) { + n_bytes = ( cr_unix.num_conns + 1 ) * sizeof(*cr_unix.conns); + crRealloc( (void **) &cr_unix.conns, n_bytes ); + cr_unix.conns[cr_unix.num_conns++] = conn; + } +} + +CRConnection** crUNIXDump( int *num ) +{ + *num = cr_unix.num_conns; + + return cr_unix.conns; +}
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor