wibble 1.1
pipe.h
Go to the documentation of this file.
1// -*- C++ -*- (c) 2008 Petr Rockai <me@mornfall.net>
2
3#include <wibble/sys/macros.h>
4
5#ifdef POSIX
6#include <fcntl.h>
7#include <sys/select.h>
8#endif
9#include <unistd.h>
10
11#include <deque>
12#include <cerrno>
13
14#include <wibble/exception.h>
15#include <wibble/sys/thread.h>
16#include <wibble/sys/mutex.h>
17#include <wibble/sys/exec.h>
18
19#ifndef WIBBLE_SYS_PIPE_H
20#define WIBBLE_SYS_PIPE_H
21
22namespace wibble {
23namespace sys {
24
25namespace wexcept = wibble::exception;
26
27struct Pipe {
28
30 int fd;
31 bool close;
32 std::string data;
33 bool running;
34 bool closed;
36
37 Writer() : fd( -1 ), close( false ), running( false ) {}
38
39 void *main() {
40 do {
41 int wrote = 0;
42
43 {
45 wrote = ::write( fd, data.c_str(), data.length() );
46 if ( wrote > 0 )
47 data.erase( data.begin(), data.begin() + wrote );
48 }
49
50 if ( wrote == -1 ) {
51 if ( blocking( errno ) )
52#ifdef POSIX
54#else
55 ;
56#endif
57 else
58 throw wexcept::System( "writing to pipe" );
59 }
60 } while ( !done() );
61
63 running = false;
64 if ( close )
65 ::close( fd );
66
67 return 0;
68 }
69
70 bool done() {
72 if ( data.empty() )
73 running = false;
74 return !running;
75 }
76
77 void run( int _fd, std::string what ) {
79
80 if ( running )
81 assert_eq( _fd, fd );
82 fd = _fd;
83 assert_neq( fd, -1 );
84
85 data += what;
86 if ( running )
87 return;
88 running = true;
89 start();
90 }
91 };
92
93 typedef std::deque< char > Buffer;
95 int fd;
96 bool _eof;
98
99 Pipe( int p ) : fd( p ), _eof( false )
100 {
101 if ( p == -1 )
102 return;
103#ifdef POSIX
104 if ( fcntl( fd, F_SETFL, O_NONBLOCK ) == -1 )
105 throw wexcept::System( "fcntl on a pipe" );
106#endif
107 }
108 Pipe() : fd( -1 ), _eof( false ) {}
109
110 /* Writes data to the pipe, asynchronously. */
111 void write( std::string what ) {
112 writer.run( fd, what );
113 }
114
115 void close() {
117 writer.close = true;
118 if ( !writer.running )
119 ::close( fd );
120 }
121
122 bool valid() {
123 return fd != -1;
124 }
125
126 bool active() {
127 return valid() && !eof();
128 }
129
130 bool eof() {
131 return _eof;
132 }
133
134 static bool blocking( int err ) {
135#ifdef POSIX
136 return err == EAGAIN || err == EWOULDBLOCK;
137#else
138 return err == EAGAIN;
139#endif
140 }
141
142 int readMore() {
143 assert( valid() );
144 char _buffer[1024];
145 int r = ::read( fd, _buffer, 1023 );
146 if ( r == -1 && !blocking( errno ) )
147 throw wexcept::System( "reading from pipe" );
148 else if ( r == -1 )
149 return 0;
150 if ( r == 0 )
151 _eof = true;
152 else
153 std::copy( _buffer, _buffer + r, std::back_inserter( buffer ) );
154 return r;
155 }
156
157 std::string nextChunk() {
158 std::string line( buffer.begin(), buffer.end() );
159 buffer.clear();
160 return line;
161 }
162
163 std::string nextLine() {
164 assert( valid() );
165 Buffer::iterator nl =
166 std::find( buffer.begin(), buffer.end(), '\n' );
167 while ( nl == buffer.end() ) {
168 if ( !readMore() )
169 return ""; // would block, so give up
170 nl = std::find( buffer.begin(), buffer.end(), '\n' );
171 }
172 std::string line( buffer.begin(), nl );
173
174 if ( nl != buffer.end() )
175 ++ nl;
176 buffer.erase( buffer.begin(), nl );
177
178 return line;
179 }
180
181 /* Only returns on eof() or when data is buffered. */
182 void wait() {
183 assert( valid() );
184#ifdef POSIX
185 fd_set fds;
186 FD_ZERO( &fds );
187#endif
188 while ( buffer.empty() && !eof() ) {
189 if ( readMore() )
190 return;
191 if ( eof() )
192 return;
193#ifdef POSIX
194#pragma GCC diagnostic push
195#pragma GCC diagnostic ignored "-Wold-style-cast"
196 FD_SET( fd, &fds );
197 select( fd + 1, &fds, 0, 0, 0 );
198#pragma GCC diagnostic pop
199#else
200 sleep( 1 );
201#endif
202 }
203 }
204 std::string nextLineBlocking() {
205 assert( valid() );
206 std::string l;
207 while ( !eof() ) {
208 l = nextLine();
209 if ( !l.empty() )
210 return l;
211 if ( eof() )
212 return std::string( buffer.begin(), buffer.end() );
213 wait();
214 }
215 return l;
216 }
217
218};
219
221{
222 std::string cmd;
223
224 PipeThrough( const std::string& _cmd ) : cmd( _cmd ) {}
225
226 std::string run( std::string data ) {
227 int _in, _out;
228
229#ifdef _WIN32
230 Exec exec(cmd);
231#elif defined POSIX
232 ShellCommand exec(cmd);
233#endif
234
235 exec.setupRedirects( &_in, &_out, 0 );
236 exec.fork();
237
238 Pipe in( _in ), out( _out );
239
240 in.write( data );
241 in.close();
242 std::string ret;
243 while ( !out.eof() ) {
244 out.wait();
245 ret += out.nextChunk();
246 }
247 return ret;
248 }
249};
250
251}
252}
253#endif
Base class for system exceptions.
Definition exception.h:397
pid_t fork()
For a subprocess to run proc.
void setupRedirects(int *stdinfd=0, int *stdoutfd=0, int *stderrfd=0)
Definition childprocess.cpp:145
Execute external commands, either forked as a ChildProcess or directly using exec().
Definition exec.h:34
pthread mutex wrapper; WARNING: the class allows copying and assignment, but this is not always safe.
Definition mutex.h:48
Execute a shell command using /bin/sh -c.
Definition exec.h:98
Encapsulates a thread.
Definition thread.h:84
void start()
Start the thread.
Definition thread.cpp:70
Definition core.h:11
void sleep(int secs)
Portable version of sleep.
Definition thread.cpp:31
Definition amorph.h:17
Definition amorph.h:30
Definition pipe.h:221
std::string cmd
Definition pipe.h:222
std::string run(std::string data)
Definition pipe.h:226
PipeThrough(const std::string &_cmd)
Definition pipe.h:224
Definition pipe.h:29
void * main()
Main thread function, executed in the new thread after creation.
Definition pipe.h:39
Writer()
Definition pipe.h:37
wibble::sys::Mutex mutex
Definition pipe.h:35
bool done()
Definition pipe.h:70
void run(int _fd, std::string what)
Definition pipe.h:77
bool running
Definition pipe.h:33
std::string data
Definition pipe.h:32
bool close
Definition pipe.h:31
int fd
Definition pipe.h:30
bool closed
Definition pipe.h:34
Definition pipe.h:27
void write(std::string what)
Definition pipe.h:111
static bool blocking(int err)
Definition pipe.h:134
Buffer buffer
Definition pipe.h:94
std::string nextLineBlocking()
Definition pipe.h:204
bool _eof
Definition pipe.h:96
void close()
Definition pipe.h:115
bool eof()
Definition pipe.h:130
int readMore()
Definition pipe.h:142
std::string nextLine()
Definition pipe.h:163
int fd
Definition pipe.h:95
bool valid()
Definition pipe.h:122
Writer writer
Definition pipe.h:97
bool active()
Definition pipe.h:126
Pipe()
Definition pipe.h:108
Pipe(int p)
Definition pipe.h:99
void wait()
Definition pipe.h:182
std::string nextChunk()
Definition pipe.h:157
std::deque< char > Buffer
Definition pipe.h:93
#define assert_eq(x, y)
Definition test.h:33
#define assert(x)
Definition test.h:30
#define assert_neq(x, y)
Definition test.h:36