001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.converter.stream;
018
019 import java.io.BufferedOutputStream;
020 import java.io.ByteArrayInputStream;
021 import java.io.ByteArrayOutputStream;
022 import java.io.File;
023 import java.io.FileInputStream;
024 import java.io.FileNotFoundException;
025 import java.io.FileOutputStream;
026 import java.io.IOException;
027 import java.io.InputStream;
028 import java.io.OutputStream;
029 import java.io.Serializable;
030 import java.util.ArrayList;
031 import java.util.List;
032 import java.util.Map;
033
034 import org.apache.camel.StreamCache;
035 import org.apache.camel.converter.IOConverter;
036 import org.apache.camel.util.FileUtil;
037 import org.apache.camel.util.IOHelper;
038
039 /**
040 * This output stream will store the content into a File if the stream context size is exceed the
041 * THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you
042 * can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property,
043 * it will choice the directory which is set by the system property of "java.io.tmpdir".
044 * You can get a cached input stream of this stream. The temp file which is created with this
045 * output stream will be deleted when you close this output stream or the cached inputStream.
046 */
047 public class CachedOutputStream extends OutputStream {
048 public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
049 public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
050
051 protected boolean outputLocked;
052 protected OutputStream currentStream;
053
054 private final List<Object> streamList = new ArrayList<Object>();
055 private long threshold = 64 * 1024;
056 private int totalLength;
057 private boolean inMemory;
058 private File tempFile;
059 private File outputDir;
060
061 public CachedOutputStream() {
062 currentStream = new ByteArrayOutputStream(2048);
063 inMemory = true;
064 }
065
066 public CachedOutputStream(long threshold) {
067 this();
068 this.threshold = threshold;
069 }
070
071 public CachedOutputStream(Map<String, String> properties) {
072 this();
073 String value = properties.get(THRESHOLD);
074 if (value != null) {
075 int i = Integer.parseInt(value);
076 if (i > 0) {
077 threshold = i;
078 }
079 }
080 value = properties.get(TEMP_DIR);
081 if (value != null) {
082 File f = new File(value);
083 if (f.exists() && f.isDirectory()) {
084 outputDir = f;
085 } else {
086 outputDir = null;
087 }
088 } else {
089 outputDir = null;
090 }
091 }
092
093 /**
094 * Perform any actions required on stream flush (freeze headers, reset
095 * output stream ... etc.)
096 */
097 protected void doFlush() throws IOException {
098 }
099
100 public void flush() throws IOException {
101 currentStream.flush();
102 doFlush();
103 }
104
105 /**
106 * Perform any actions required on stream closure (handle response etc.)
107 */
108 protected void doClose() throws IOException {
109 }
110
111 /**
112 * Perform any actions required after stream closure (close the other related stream etc.)
113 */
114 protected void postClose() throws IOException {
115 }
116
117 /**
118 * Locks the output stream to prevent additional writes, but maintains
119 * a pointer to it so an InputStream can be obtained
120 * @throws IOException
121 */
122 public void lockOutputStream() throws IOException {
123 currentStream.flush();
124 outputLocked = true;
125 streamList.remove(currentStream);
126 }
127
128 public void close() throws IOException {
129 currentStream.flush();
130 doClose();
131 currentStream.close();
132 maybeDeleteTempFile(currentStream);
133 postClose();
134 }
135
136 public boolean equals(Object obj) {
137 return currentStream.equals(obj);
138 }
139
140 /**
141 * Replace the original stream with the new one, optionally copying the content of the old one
142 * into the new one.
143 * When with Attachment, needs to replace the xml writer stream with the stream used by
144 * AttachmentSerializer or copy the cached output stream to the "real"
145 * output stream, i.e. onto the wire.
146 *
147 * @param out the new output stream
148 * @param copyOldContent flag indicating if the old content should be copied
149 * @throws IOException
150 */
151 public void resetOut(OutputStream out, boolean copyOldContent) throws IOException {
152 if (out == null) {
153 out = new ByteArrayOutputStream();
154 }
155
156 if (currentStream instanceof CachedOutputStream) {
157 CachedOutputStream ac = (CachedOutputStream) currentStream;
158 InputStream in = ac.getInputStream();
159 IOHelper.copyAndCloseInput(in, out);
160 } else {
161 if (inMemory) {
162 if (currentStream instanceof ByteArrayOutputStream) {
163 ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream;
164 if (copyOldContent && byteOut.size() > 0) {
165 byteOut.writeTo(out);
166 }
167 } else {
168 throw new IOException("Unknown format of currentStream: " + currentStream);
169 }
170 } else {
171 // read the file
172 currentStream.close();
173 FileInputStream fin = new FileInputStream(tempFile);
174 if (copyOldContent) {
175 IOHelper.copyAndCloseInput(fin, out);
176 }
177 streamList.remove(currentStream);
178 tempFile.delete();
179 tempFile = null;
180 inMemory = true;
181 }
182 }
183 currentStream = out;
184 outputLocked = false;
185 }
186
187 public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException {
188 IOHelper.copyAndCloseInput(in, out, bufferSize);
189 }
190
191 public int size() {
192 return totalLength;
193 }
194
195 public byte[] getBytes() throws IOException {
196 flush();
197 if (inMemory) {
198 if (currentStream instanceof ByteArrayOutputStream) {
199 return ((ByteArrayOutputStream)currentStream).toByteArray();
200 } else {
201 throw new IOException("Unknown format of currentStream");
202 }
203 } else {
204 // read the file
205 FileInputStream fin = new FileInputStream(tempFile);
206 return IOConverter.toBytes(fin);
207 }
208 }
209
210 public void writeCacheTo(OutputStream out) throws IOException {
211 flush();
212 if (inMemory) {
213 if (currentStream instanceof ByteArrayOutputStream) {
214 ((ByteArrayOutputStream)currentStream).writeTo(out);
215 } else {
216 throw new IOException("Unknown format of currentStream");
217 }
218 } else {
219 // read the file
220 FileInputStream fin = new FileInputStream(tempFile);
221 IOHelper.copyAndCloseInput(fin, out);
222 }
223 }
224
225
226 public void writeCacheTo(StringBuilder out, int limit) throws IOException {
227 flush();
228 if (totalLength < limit
229 || limit == -1) {
230 writeCacheTo(out);
231 return;
232 }
233
234 int count = 0;
235 if (inMemory) {
236 if (currentStream instanceof ByteArrayOutputStream) {
237 byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray();
238 out.append(IOHelper.newStringFromBytes(bytes, 0, limit));
239 } else {
240 throw new IOException("Unknown format of currentStream: " + currentStream);
241 }
242 } else {
243 // read the file
244 FileInputStream fin = new FileInputStream(tempFile);
245 byte bytes[] = new byte[1024];
246 int x = fin.read(bytes);
247 while (x != -1) {
248 if ((count + x) > limit) {
249 x = limit - count;
250 }
251 out.append(IOHelper.newStringFromBytes(bytes, 0, x));
252 count += x;
253
254 if (count >= limit) {
255 x = -1;
256 } else {
257 x = fin.read(bytes);
258 }
259 }
260 fin.close();
261 }
262 }
263 public void writeCacheTo(StringBuilder out) throws IOException {
264 flush();
265 if (inMemory) {
266 if (currentStream instanceof ByteArrayOutputStream) {
267 byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray();
268 out.append(IOHelper.newStringFromBytes(bytes));
269 } else {
270 throw new IOException("Unknown format of currentStream: " + currentStream);
271 }
272 } else {
273 // read the file
274 FileInputStream fin = new FileInputStream(tempFile);
275 byte bytes[] = new byte[1024];
276 int x = fin.read(bytes);
277 while (x != -1) {
278 out.append(IOHelper.newStringFromBytes(bytes, 0, x));
279 x = fin.read(bytes);
280 }
281 fin.close();
282 }
283 }
284
285
286 /**
287 * @return the underlying output stream
288 */
289 public OutputStream getOut() {
290 return currentStream;
291 }
292
293 public int hashCode() {
294 return currentStream.hashCode();
295 }
296
297 public String toString() {
298 StringBuilder builder = new StringBuilder().append("[")
299 .append(CachedOutputStream.class.getName())
300 .append(" Content: ");
301 try {
302 writeCacheTo(builder);
303 } catch (IOException e) {
304 //ignore
305 }
306 return builder.append("]").toString();
307 }
308
309 protected void onWrite() throws IOException {
310 }
311
312 public void write(byte[] b, int off, int len) throws IOException {
313 if (!outputLocked) {
314 onWrite();
315 this.totalLength += len;
316 if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
317 createFileOutputStream();
318 }
319 currentStream.write(b, off, len);
320 }
321 }
322
323 public void write(byte[] b) throws IOException {
324 if (!outputLocked) {
325 onWrite();
326 this.totalLength += b.length;
327 if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
328 createFileOutputStream();
329 }
330 currentStream.write(b);
331 }
332 }
333
334 public void write(int b) throws IOException {
335 if (!outputLocked) {
336 onWrite();
337 this.totalLength++;
338 if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
339 createFileOutputStream();
340 }
341 currentStream.write(b);
342 }
343 }
344
345 private void createFileOutputStream() throws IOException {
346 ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
347 if (outputDir == null) {
348 tempFile = FileUtil.createTempFile("cos", "tmp");
349 } else {
350 tempFile = FileUtil.createTempFile("cos", "tmp", outputDir, false);
351 }
352
353 currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
354 bout.writeTo(currentStream);
355 inMemory = false;
356 streamList.add(currentStream);
357 }
358
359 public File getTempFile() {
360 return tempFile != null && tempFile.exists() ? tempFile : null;
361 }
362
363 public InputStream getInputStream() throws IOException {
364 flush();
365 if (inMemory) {
366 if (currentStream instanceof ByteArrayOutputStream) {
367 return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
368 } else {
369 return null;
370 }
371 } else {
372 try {
373 FileInputStream fileInputStream = new FileInputStream(tempFile) {
374 public void close() throws IOException {
375 super.close();
376 maybeDeleteTempFile(this);
377 }
378 };
379 streamList.add(fileInputStream);
380 return fileInputStream;
381 } catch (FileNotFoundException e) {
382 throw IOHelper.createIOException("Cached file was already deleted", e);
383 }
384 }
385 }
386
387 public StreamCache getStreamCache() throws IOException {
388 flush();
389 if (inMemory) {
390 if (currentStream instanceof ByteArrayOutputStream) {
391 return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
392 } else {
393 return null;
394 }
395 } else {
396 try {
397 return new FileInputStreamCache(tempFile, this);
398 } catch (FileNotFoundException e) {
399 throw IOHelper.createIOException("Cached file was already deleted", e);
400 }
401 }
402 }
403
404 private void maybeDeleteTempFile(Object stream) {
405 streamList.remove(stream);
406 if (!inMemory && tempFile != null && streamList.isEmpty()) {
407 tempFile.delete();
408 tempFile = null;
409 currentStream = new ByteArrayOutputStream(1024);
410 inMemory = true;
411 }
412 }
413
414 public void setOutputDir(File outputDir) throws IOException {
415 this.outputDir = outputDir;
416 }
417
418 public void setThreshold(long threshold) {
419 this.threshold = threshold;
420 }
421
422 }