001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.converter.stream;
018
019 import java.io.BufferedOutputStream;
020 import java.io.ByteArrayInputStream;
021 import java.io.ByteArrayOutputStream;
022 import java.io.File;
023 import java.io.FileInputStream;
024 import java.io.FileNotFoundException;
025 import java.io.FileOutputStream;
026 import java.io.IOException;
027 import java.io.InputStream;
028 import java.io.OutputStream;
029 import java.util.ArrayList;
030 import java.util.List;
031 import java.util.Map;
032
033 import org.apache.camel.converter.IOConverter;
034 import org.apache.camel.util.FileUtil;
035 import org.apache.camel.util.IOHelper;
036
037 /**
038 * This output stream will store the content into a File if the stream context size is exceed the
039 * THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you
040 * can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property,
041 * it will choice the directory which is set by the system property of "java.io.tmpdir".
042 * You can get a cached input stream of this stream. The temp file which is created with this
043 * output stream will be deleted when you close this output stream or the cached inputStream.
044 */
045 public class CachedOutputStream extends OutputStream {
046 public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
047 public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
048
049 protected boolean outputLocked;
050 protected OutputStream currentStream;
051
052 private long threshold = 64 * 1024;
053
054 private int totalLength;
055
056 private boolean inmem;
057
058 private File tempFile;
059
060 private File outputDir;
061
062 private List<Object> streamList = new ArrayList<Object>();
063
064
065 public CachedOutputStream() {
066 currentStream = new ByteArrayOutputStream(2048);
067 inmem = true;
068 }
069
070 public CachedOutputStream(long threshold) {
071 this();
072 this.threshold = threshold;
073 }
074
075 public CachedOutputStream(Map<String, String> properties) {
076 this();
077 String value = properties.get(THRESHOLD);
078 if (value != null) {
079 int i = Integer.parseInt(value);
080 if (i > 0) {
081 threshold = i;
082 }
083 }
084 value = properties.get(TEMP_DIR);
085 if (value != null) {
086 File f = new File(value);
087 if (f.exists() && f.isDirectory()) {
088 outputDir = f;
089 } else {
090 outputDir = null;
091 }
092 } else {
093 outputDir = null;
094 }
095 }
096
097 /**
098 * Perform any actions required on stream flush (freeze headers, reset
099 * output stream ... etc.)
100 */
101 protected void doFlush() throws IOException {
102
103 }
104
105 public void flush() throws IOException {
106 currentStream.flush();
107 doFlush();
108 }
109
110 /**
111 * Perform any actions required on stream closure (handle response etc.)
112 */
113 protected void doClose() throws IOException {
114
115 }
116
117 /**
118 * Perform any actions required after stream closure (close the other related stream etc.)
119 */
120 protected void postClose() throws IOException {
121
122 }
123
124 /**
125 * Locks the output stream to prevent additional writes, but maintains
126 * a pointer to it so an InputStream can be obtained
127 * @throws IOException
128 */
129 public void lockOutputStream() throws IOException {
130 currentStream.flush();
131 outputLocked = true;
132 streamList.remove(currentStream);
133 }
134
135 public void close() throws IOException {
136 currentStream.flush();
137 doClose();
138 currentStream.close();
139 maybeDeleteTempFile(currentStream);
140 postClose();
141 }
142
143 public boolean equals(Object obj) {
144 return currentStream.equals(obj);
145 }
146
147 /**
148 * Replace the original stream with the new one, optionally copying the content of the old one
149 * into the new one.
150 * When with Attachment, needs to replace the xml writer stream with the stream used by
151 * AttachmentSerializer or copy the cached output stream to the "real"
152 * output stream, i.e. onto the wire.
153 *
154 * @param out the new output stream
155 * @param copyOldContent flag indicating if the old content should be copied
156 * @throws IOException
157 */
158 public void resetOut(OutputStream out, boolean copyOldContent) throws IOException {
159 if (out == null) {
160 out = new ByteArrayOutputStream();
161 }
162
163 if (currentStream instanceof CachedOutputStream) {
164 CachedOutputStream ac = (CachedOutputStream) currentStream;
165 InputStream in = ac.getInputStream();
166 IOHelper.copyAndCloseInput(in, out);
167 } else {
168 if (inmem) {
169 if (currentStream instanceof ByteArrayOutputStream) {
170 ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream;
171 if (copyOldContent && byteOut.size() > 0) {
172 byteOut.writeTo(out);
173 }
174 } else {
175 throw new IOException("Unknown format of currentStream");
176 }
177 } else {
178 // read the file
179 currentStream.close();
180 FileInputStream fin = new FileInputStream(tempFile);
181 if (copyOldContent) {
182 IOHelper.copyAndCloseInput(fin, out);
183 }
184 streamList.remove(currentStream);
185 tempFile.delete();
186 tempFile = null;
187 inmem = true;
188 }
189 }
190 currentStream = out;
191 outputLocked = false;
192 }
193
194 public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException {
195 IOHelper.copyAndCloseInput(in, out, bufferSize);
196 }
197
198 public int size() {
199 return totalLength;
200 }
201
202 public byte[] getBytes() throws IOException {
203 flush();
204 if (inmem) {
205 if (currentStream instanceof ByteArrayOutputStream) {
206 return ((ByteArrayOutputStream)currentStream).toByteArray();
207 } else {
208 throw new IOException("Unknown format of currentStream");
209 }
210 } else {
211 // read the file
212 FileInputStream fin = new FileInputStream(tempFile);
213 return IOConverter.toBytes(fin);
214 }
215 }
216
217 public void writeCacheTo(OutputStream out) throws IOException {
218 flush();
219 if (inmem) {
220 if (currentStream instanceof ByteArrayOutputStream) {
221 ((ByteArrayOutputStream)currentStream).writeTo(out);
222 } else {
223 throw new IOException("Unknown format of currentStream");
224 }
225 } else {
226 // read the file
227 FileInputStream fin = new FileInputStream(tempFile);
228 IOHelper.copyAndCloseInput(fin, out);
229 }
230 }
231
232
233 public void writeCacheTo(StringBuilder out, int limit) throws IOException {
234 flush();
235 if (totalLength < limit
236 || limit == -1) {
237 writeCacheTo(out);
238 return;
239 }
240
241 int count = 0;
242 if (inmem) {
243 if (currentStream instanceof ByteArrayOutputStream) {
244 byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray();
245 out.append(IOHelper.newStringFromBytes(bytes, 0, limit));
246 } else {
247 throw new IOException("Unknown format of currentStream");
248 }
249 } else {
250 // read the file
251 FileInputStream fin = new FileInputStream(tempFile);
252 byte bytes[] = new byte[1024];
253 int x = fin.read(bytes);
254 while (x != -1) {
255 if ((count + x) > limit) {
256 x = limit - count;
257 }
258 out.append(IOHelper.newStringFromBytes(bytes, 0, x));
259 count += x;
260
261 if (count >= limit) {
262 x = -1;
263 } else {
264 x = fin.read(bytes);
265 }
266 }
267 fin.close();
268 }
269 }
270 public void writeCacheTo(StringBuilder out) throws IOException {
271 flush();
272 if (inmem) {
273 if (currentStream instanceof ByteArrayOutputStream) {
274 byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray();
275 out.append(IOHelper.newStringFromBytes(bytes));
276 } else {
277 throw new IOException("Unknown format of currentStream");
278 }
279 } else {
280 // read the file
281 FileInputStream fin = new FileInputStream(tempFile);
282 byte bytes[] = new byte[1024];
283 int x = fin.read(bytes);
284 while (x != -1) {
285 out.append(IOHelper.newStringFromBytes(bytes, 0, x));
286 x = fin.read(bytes);
287 }
288 fin.close();
289 }
290 }
291
292
293 /**
294 * @return the underlying output stream
295 */
296 public OutputStream getOut() {
297 return currentStream;
298 }
299
300 public int hashCode() {
301 return currentStream.hashCode();
302 }
303
304 public String toString() {
305 StringBuilder builder = new StringBuilder().append("[")
306 .append(CachedOutputStream.class.getName())
307 .append(" Content: ");
308 try {
309 writeCacheTo(builder);
310 } catch (IOException e) {
311 //ignore
312 }
313 return builder.append("]").toString();
314 }
315
316 protected void onWrite() throws IOException {
317
318 }
319
320 public void write(byte[] b, int off, int len) throws IOException {
321 if (!outputLocked) {
322 onWrite();
323 this.totalLength += len;
324 if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
325 createFileOutputStream();
326 }
327 currentStream.write(b, off, len);
328 }
329 }
330
331 public void write(byte[] b) throws IOException {
332 if (!outputLocked) {
333 onWrite();
334 this.totalLength += b.length;
335 if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
336 createFileOutputStream();
337 }
338 currentStream.write(b);
339 }
340 }
341
342 public void write(int b) throws IOException {
343 if (!outputLocked) {
344 onWrite();
345 this.totalLength++;
346 if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
347 createFileOutputStream();
348 }
349 currentStream.write(b);
350 }
351 }
352
353 private void createFileOutputStream() throws IOException {
354 ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
355 if (outputDir == null) {
356 tempFile = FileUtil.createTempFile("cos", "tmp");
357 } else {
358 tempFile = FileUtil.createTempFile("cos", "tmp", outputDir, false);
359 }
360
361 currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
362 bout.writeTo(currentStream);
363 inmem = false;
364 streamList.add(currentStream);
365 }
366
367 public File getTempFile() {
368 return tempFile != null && tempFile.exists() ? tempFile : null;
369 }
370
371 public InputStream getInputStream() throws IOException {
372 flush();
373 if (inmem) {
374 if (currentStream instanceof ByteArrayOutputStream) {
375 return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
376 } else {
377 return null;
378 }
379 } else {
380 try {
381 FileInputStream fileInputStream = new FileInputStream(tempFile) {
382 public void close() throws IOException {
383 super.close();
384 maybeDeleteTempFile(this);
385 }
386 };
387 streamList.add(fileInputStream);
388 return fileInputStream;
389 } catch (FileNotFoundException e) {
390 throw new IOException("Cached file was deleted, " + e.toString());
391 }
392 }
393 }
394
395 public StreamCache getStreamCache() throws IOException {
396 flush();
397 if (inmem) {
398 if (currentStream instanceof ByteArrayOutputStream) {
399 return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
400 } else {
401 return null;
402 }
403 } else {
404 try {
405 FileInputStreamCache fileInputStream = new FileInputStreamCache(tempFile, this);
406 return fileInputStream;
407 } catch (FileNotFoundException e) {
408 throw new IOException("Cached file was deleted, " + e.toString());
409 }
410 }
411 }
412
413 private void maybeDeleteTempFile(Object stream) {
414 streamList.remove(stream);
415 if (!inmem && tempFile != null && streamList.isEmpty()) {
416 tempFile.delete();
417 tempFile = null;
418 currentStream = new ByteArrayOutputStream(1024);
419 inmem = true;
420 }
421 }
422
423 public void setOutputDir(File outputDir) throws IOException {
424 this.outputDir = outputDir;
425 }
426 public void setThreshold(long threshold) {
427 this.threshold = threshold;
428 }
429
430 }