Using Apache Camel with Scala and the Camel Scala DSL
February 13th, 2013 by Micha KopsWhenever I encounter a situation where I have to mix a blend of different services and endpoints and apply one or more of the traditional enterprise integration patterns then Apache Camel often is my weapon of choice.
I simply love how easy it is to set up some datasources, add some routing magic, data transformers, load balancers, content enrichers and enjoy the result.
Another thing that I’m beginning to love is Scala and so this is the perfect time to write an article about using Scala and Apache Camel together.
In the following tutorial we’re setting up our environment using SBT and Scala we’ll take a look at several interesting use cases for camel.
Contents
Dependencies / Project Setup
I’m using SBT to build the project here. You just need to add some dependencies here to your build.sbt.
name := "camel-scala-tutorial" organization := "com.hascode.tutorial" version := "1.0.0" libraryDependencies ++= Seq( "org.apache.camel" % "camel-scala" % "2.10.1", "org.apache.activemq" % "activemq-core" % "5.6.0", "org.apache.camel" % "camel-jms" % "2.10.1", "ch.qos.logback" % "logback-core" % "1.0.9", "ch.qos.logback" % "logback-classic" % "1.0.9", "javax.mail" % "mail" % "1.4.5", "com.icegreen" % "greenmail" % "1.3", "org.apache.camel" % "camel-mail" % "2.10.3", "org.apache.camel" % "camel-velocity" % "2.10.3" )
A short explanation for the dependencies declared above:
- camel-scala: Adds the Apache Camel API, this is the minimum requirement for us here..
- activemq-core and camel-jsm: The first one allows us to start an embedded instance of an ActiveMQ server and the second one allows us to use JMS in our routes
- logback-core/logback-classic: Just some logging API, you may use your favourite logging framework here ..
- greenmail: Allows us to create an embedded mail server that may handle all kinds of e-mail transfer. Some more detailed information can be found in my article “Integration Testing IMAP, SMTP and POP3 with GreenMail“.
- camel-mail: Adds support for handling e-mail messages in a route.
- camel-velocity: As you might have guessed is this the library to add velocity template support
Now that we’ve got every library we need let’s start to create some routes and play around with camel.
Camel Scala Enhancements
First of all, camel-scala offers a custom implementation of the route builder, org.apache.camel.scala.builder.RouteBuilder that offers us three features compared with the Java DSL:
- there is no configure() method to override
- a route starts directly with a URI instead of from(uri)
- -> is just an alias for to
In addition there’s a trait that you may use, org.apache.camel.scala.dsl.builder.RouteBuilderSupport that adds an implicit conversion from the scala to the java builder:
implicit def scalaToJavaBuilder(scalaBuilder: RouteBuilder) = scalaBuilder.builder
The good thing is: You won’t encounter any problem using scala because the DSLs are 100% compatibel. In addition there are open source projects that take this goal further and provide additional syntactic sugar when using scala and camel together.
One example is Camel Extras for Scala that adds some goodies like additional enhancements for the route builder or additional type converters for different scala types and collections.
If you’re interested, please feel free to take a look at the Github project at: https://github.com/osinka/camel-scala-extra
Now that we’ve covered some basics it is time to do some programming here…
Simple File Endpoint
The first example is an easy one .. we’ve got two directories: data/inbox and data/outbox. Everything we’re putting into data/inbox is fetched by camel and put into data/outbox.
package com.hascode.tutorial import org.apache.camel.scala.dsl.builder.RouteBuilder import org.apache.camel.CamelContext import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.scala.dsl.builder.RouteBuilderSupport object SimpleFileEndpointExample extends App { val context: CamelContext = new DefaultCamelContext val routeBuilder = new RouteBuilder { from("file:data/inbox") --> ("file:data/outbox") } context.addRoutes(routeBuilder) context.start while (true) {} }
Running the example above yields the following output when we put a file into the directory data/inbox
$ date >> data/inbox/test.txt
16:41:15.156 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Took 0.000 seconds to poll: data/inbox 16:41:15.159 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Total 1 files to consume 16:41:15.160 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - About to process file: GenericFile[test.txt] using exchange: Exchange[test.txt] 16:41:15.166 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[test.txt] 16:41:15.169 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileOperations - Using FileChannel to write file: data/outbox/test.txt 16:41:15.174 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/test.txt] to [Endpoint[file://data/outbox]] 16:41:15.176 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.file.GenericFileOnCompletion - Done processing file: GenericFile[test.txt] using exchange: Exchange[test.txt] 16:41:15.176 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Retrying attempt 0 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.txt.camelLock 16:41:15.176 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.txt.camelLock with result: true 16:41:15.177 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.f.s.GenericFileRenameProcessStrategy - Renaming file: GenericFile[test.txt] to: GenericFile[.camel/test.txt] 16:41:15.178 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to rename file: /data/project/camel-scala-tutorial/data/inbox/test.txt to: data/inbox/.camel/test.txt with result: true
JMS / ActiveMQ Example
In the next example we’re going to do some JMS messaging using an embedded instance of ActiveMQ.
The following route scans the directory data/inbox for incoming files.
If there is a file it is put in a specific queue, filtered by the file extension. In addition we’re fetching the messages from alle JMS queues and print the processed file’s name.
package com.hascode.tutorial import org.apache.camel.scala.dsl.builder.RouteBuilder import org.apache.camel.CamelContext import org.apache.camel.impl.DefaultCamelContext import javax.jms.ConnectionFactory import org.apache.camel.component.jms.JmsComponent import org.apache.activemq.ActiveMQConnectionFactory import org.apache.camel.Processor import org.apache.camel.scala.dsl.builder.RouteBuilderSupport import org.apache.camel.Exchange object RoutingAfterCBRExample extends App with RouteBuilderSupport { val context: CamelContext = new DefaultCamelContext val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost") context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)) val routeBuilder = new RouteBuilder { from("file:data/inbox") .choice { when(fileEndsWith(_, "xml")).to("jms:xmlOrders") when(fileEndsWith(_, "csv")).to("jms:csvOrders") otherwise().to("jms:unknownOrders") } from("jms:xmlOrders") .process(exchange => println("XML type order received: " + exchange.getIn().getHeader("CamelFileName"))) from("jms:csvOrders") .process(exchange => println("CSV type order received: " + exchange.getIn().getHeader("CamelFileName"))) from("jms:unknownOrders") .process(exchange => println("Unknown type order received: " + exchange.getIn().getHeader("CamelFileName"))) } context.addRoutes(routeBuilder) context.start while (true) {} def fileEndsWith(ex: Exchange, fileExt: String): Boolean = { ex.getIn().getHeader("CamelFileName") match { case x: String => return x.endsWith(fileExt) case _ => false } } }
Running the application above and adding the following files to the data/inbox directory should give you a similar output (shortened)
$ date >> data/inbox/order1.csv && date >> data/inbox/someorder.xml && date >> data/inbox/strangeorder.foo && date >> data/inbox/bang
[..] CSV type order received: order1.csv [..] Unknown type order received: strangeorder.foo [..] Unknown type order received: bang [..] XML type order received: someorder.xml
Resequencer Example
The resequencer allows us to bring our messages back in a predefined order .. e.g. you might want to resort incoming messages using the JMSPriority header or something like that.
In our example, we have two queues .. wip and wip-inorder. What we’re doing now is to put some messages with a priority header into the wip queue and the resequencer orders messages from this queue and puts the messages in the correct order into the wip-inorder queue.
Some more detailed information can be found at the Camel website: Resequencer.
package com.hascode.tutorial import org.apache.camel.scala.dsl.builder.RouteBuilder import org.apache.camel.CamelContext import org.apache.camel.impl.DefaultCamelContext import javax.jms.ConnectionFactory import org.apache.camel.component.jms.JmsComponent import org.apache.activemq.ActiveMQConnectionFactory import org.apache.camel.Processor import org.apache.camel.scala.dsl.builder.RouteBuilderSupport object ResequencerExample extends App with RouteBuilderSupport { val context: CamelContext = new DefaultCamelContext val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost") context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)) val routeBuilder = new RouteBuilder { from("jms:wip").resequence(header("custom-priority")).batch().timeout(10000).reverse().to("jms:wip-inorder") from("jms:wip-inorder") .process(exchange => println("Message received with priority: " + exchange.getIn().getHeader("custom-priority"))) } context.addRoutes(routeBuilder) context.start Thread.sleep(2000) println("sending some messages..") val tpl = context.createProducerTemplate tpl.setDefaultEndpointUri("jms:wip") tpl.sendBodyAndHeader("foo1", "custom-priority", 1) tpl.sendBodyAndHeader("foo2", "custom-priority", 5) tpl.sendBodyAndHeader("foo3", "custom-priority", 6) tpl.sendBodyAndHeader("foo4", "custom-priority", 3) tpl.sendBodyAndHeader("foo5", "custom-priority", 4) tpl.sendBodyAndHeader("foo6", "custom-priority", 2) while (true) {} }
Running the application above should give a similar output. Now the messages are ordered by their header “custom-priority”:
[..] 16:10:02.253 [main] DEBUG o.a.camel.impl.DefaultCamelContext - Adding routes from builder: com.hascode.tutorial.ResequencerExample$$anon$1@7776cad3 16:10:02.452 [main] DEBUG o.apache.camel.impl.DefaultComponent - Creating endpoint uri=[jms://wip], path=[wip], parameters=[{}] 16:10:02.457 [main] DEBUG o.a.camel.impl.DefaultCamelContext - jms://wip converted to endpoint: Endpoint[jms://wip] by component: org.apache.camel.component.jms.JmsComponent@6b552b76 16:10:02.530 [main] DEBUG o.apache.camel.impl.DefaultComponent - Creating endpoint uri=[jms://wip-inorder], path=[wip-inorder], parameters=[{}] 16:10:02.530 [main] DEBUG o.a.camel.impl.DefaultCamelContext - jms://wip-inorder converted to endpoint: Endpoint[jms://wip-inorder] by component: org.apache.camel.component.jms.JmsComponent@6b552b76 16:10:02.555 [main] DEBUG o.a.c.m.DefaultManagementAgent - Registered MBean with ObjectName: org.apache.camel:context=styx/camel-1,type=endpoints,name="jms://wip-inorder" 16:10:02.557 [main] DEBUG o.a.c.p.interceptor.DefaultChannel - Initialize channel for target: 'To[jms:wip-inorder]' 16:10:02.589 [main] DEBUG o.a.c.p.interceptor.DefaultChannel - Initialize channel for target: 'Resequencer[{org.apache.camel.scala.ScalaExpression@6da0d866} -> [To[jms:wip-inorder]]]' [..] 16:10:02.628 [main] DEBUG o.a.camel.component.jms.JmsProducer - Starting producer: Producer[jms://wip-inorder] [..] 16:10:05.394 [main] DEBUG o.a.c.component.jms.JmsConfiguration - Sending JMS message to: queue://wip with message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {custom_HYPHEN_priority=1, breadcrumbId=ID-styx-41187-1360509002058-0-1}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = foo1} [..] 16:10:05.494 [Camel (camel-1) thread #1 - JmsConsumer[wip]] DEBUG o.a.camel.processor.BatchProcessor - Received exchange to be batched: Exchange[JmsMessage[JmsMessageID: ID:styx-33805-1360509003088-3:8:1:1:1]] 16:10:15.495 [Camel (camel-1) thread #0 - Batch Sender] DEBUG o.a.camel.processor.BatchProcessor - Sending aggregated exchange: Exchange[JmsMessage[JmsMessageID: ID:styx-33805-1360509003088-3:5:1:1:1]] 16:10:15.496 [Camel (camel-1) thread #0 - Batch Sender] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[jms://wip-inorder] Exchange[JmsMessage[JmsMessageID: ID:styx-33805-1360509003088-3:5:1:1:1]] [..] 16:10:15.499 [ActiveMQ Task-1] DEBUG o.a.a.broker.region.AbstractRegion - localhost adding consumer: ID:styx-33805-1360509003088-3:9:-1:1 for destination: ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic 16:10:15.502 [Camel (camel-1) thread #0 - Batch Sender] DEBUG o.a.c.component.jms.JmsConfiguration - Sending JMS message to: queue://wip-inorder with message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {custom_HYPHEN_priority=6, breadcrumbId=ID-styx-41187-1360509002058-0-7, CamelJmsDeliveryMode=2}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = foo3} Message received with priority: 6 [..] Message received with priority: 5 [..] Message received with priority: 4 [..] Message received with priority: 3 [..] Message received with priority: 2 [..] Message received with priority: 1
Load Balancer
Camel offers you an easy way for load balancing using different strategies. We’re using an easy round-robin strategy here without any special configuration.
There’s a lot of useful information on possible configuration and load balancing strategies on the Camel documentation.
package com.hascode.tutorial import org.apache.camel.scala.dsl.builder.RouteBuilder import org.apache.camel.CamelContext import org.apache.camel.impl.DefaultCamelContext import javax.jms.ConnectionFactory import org.apache.camel.component.jms.JmsComponent import org.apache.activemq.ActiveMQConnectionFactory import org.apache.camel.Processor import org.apache.camel.scala.dsl.builder.RouteBuilderSupport object LoadBalancerExample extends App with RouteBuilderSupport { val context: CamelContext = new DefaultCamelContext val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost") context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)) val routeBuilder = new RouteBuilder { from("jms:incoming").loadBalance(). roundRobin().to("jms:worker-queue-1", "jms:worker-queue-2", "jms:worker-queue-3"); from("jms:worker-queue-1") .process(exchange => println("Handler 1 recevied a message with number flag: " + exchange.getIn().getHeader("my-number"))) from("jms:worker-queue-2") .process(exchange => println("Handler 2 recevied a message with number flag: " + exchange.getIn().getHeader("my-number"))) from("jms:worker-queue-3") .process(exchange => println("Handler 3 recevied a message with number flag: " + exchange.getIn().getHeader("my-number"))) } context.addRoutes(routeBuilder) context.start Thread.sleep(2000) val tpl = context.createProducerTemplate tpl.setDefaultEndpointUri("jms:incoming") tpl.sendBodyAndHeader("foo1", "my-number", 1) tpl.sendBodyAndHeader("foo2", "my-number", 2) tpl.sendBodyAndHeader("foo3", "my-number", 3) tpl.sendBodyAndHeader("foo4", "my-number", 4) tpl.sendBodyAndHeader("foo5", "my-number", 5) tpl.sendBodyAndHeader("foo6", "my-number", 6) while (true) {} }
Running the example above could produce a similar output – but it is not guaranteed..
Handler 1 recevied a message with number flag: 1 Handler 2 recevied a message with number flag: 2 Handler 3 recevied a message with number flag: 3 Handler 1 recevied a message with number flag: 4 Handler 2 recevied a message with number flag: 5 Handler 3 recevied a message with number flag: 6
IMAP Component
In the next example we’re setting up an email server using the greenmail library – if you’re interested, please feel free to take a look at my tutorial “Integration Testing IMAP, SMTP and POP3 with GreenMail“.
Afterwards we’re setting up a camel route that fetches e-mails from this IMAP server and stores them in the directory data/outbox.
package com.hascode.tutorial import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.scala.dsl.builder.RouteBuilder import org.apache.camel.scala.dsl.builder.RouteBuilderSupport import org.apache.camel.CamelContext import com.icegreen.greenmail.user.GreenMailUser import com.icegreen.greenmail.util.GreenMail import com.icegreen.greenmail.util.ServerSetupTest import javax.mail.internet.MimeMessage import javax.mail.Session import javax.mail.internet.InternetAddress import javax.mail.Message object ImapExample extends App { val mailServer = new GreenMail(ServerSetupTest.IMAP) mailServer.start val user = mailServer.setUser("test@hascode.com", "joe", "XXXX") val context: CamelContext = new DefaultCamelContext val routeBuilder = new RouteBuilder { from("imap://joe@0.0.0.0:3143?password=XXXX") --> ("file:data/outbox") } context.addRoutes(routeBuilder) context.start Thread.sleep(3000) for (i <- 0 until 3) { var message = new MimeMessage(null: Session) message.setFrom(new InternetAddress("test@hascode.com")) message.addRecipient(Message.RecipientType.TO, new InternetAddress( "foo@hascode.com")) message.setSubject("Test E-Mail #" + i) message.setText("This is a fine test e-mail. It is the " + i + " message of 3.") println("sending new email: #" + i) user.deliver(message) Thread.sleep(2000) } while (true) {} }
Running the application above produces a similar output
22:01:42.794 [main] DEBUG o.apache.camel.impl.DefaultComponent - Creating endpoint uri=[imap://joe@0.0.0.0:3143?password=******], path=[joe@0.0.0.0:3143], parameters=[{password=XXXX}] sending new email: #0 sending new email: #1 sending new email: #2 22:02:44.214 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Polling mailbox folder: imap://0.0.0.0:3143, folder=INBOX 22:02:44.298 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Fetching 3 messages. Total 3 messages. 22:02:44.480 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Processing message: messageNumber=[1], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #0], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM] 22:02:44.648 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[MailMessage: messageNumber=[1], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #0], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]] 22:02:44.654 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.file.FileOperations - Using InputStream to write file: data/outbox/ID-styx-40497-1360702902385-0-1 22:02:44.660 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/ID-styx-40497-1360702902385-0-1] to [Endpoint[file://data/outbox]] 22:02:44.743 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Processing message: messageNumber=[2], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #1], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM] 22:02:44.904 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[MailMessage: messageNumber=[2], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #1], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]] 22:02:44.904 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.file.FileOperations - Using InputStream to write file: data/outbox/ID-styx-40497-1360702902385-0-3 22:02:44.908 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/ID-styx-40497-1360702902385-0-3] to [Endpoint[file://data/outbox]] 22:02:44.987 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Processing message: messageNumber=[3], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #2], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM] 22:02:45.148 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[MailMessage: messageNumber=[3], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #2], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]] 22:02:45.148 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.file.FileOperations - Using InputStream to write file: data/outbox/ID-styx-40497-1360702902385-0-5 22:02:45.152 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/ID-styx-40497-1360702902385-0-5] to [Endpoint[file://data/outbox]]
If you take a look now in the directory data/outbox you can see the messages .. each file contains the e-mail body:
$ tree data/outbox/ data/outbox/ ├── ID-localhost-40497-1360702902385-0-1 ├── ID-localhost-40497-1360702902385-0-3 └── ID-localhost-40497-1360702902385-0-5
Velocity Templates
In the following simple example we’re just scanning the directory data/inbox for incoming files. When a file is added to this directory, a velocity template is applied to create a new file.
First of all this is our velocity template in com/hascode/tutorial/ named transform.vm
This is the transformed file Let's iterate over the available headers: #foreach ($key in $headers.keySet()) - key: $key has value: $headers.get($key) #end The body: ${body}
This is the application and the route binding.
package com.hascode.tutorial import org.apache.camel.scala.dsl.builder.RouteBuilder import org.apache.camel.CamelContext import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.scala.dsl.builder.RouteBuilderSupport object VelocityExample extends App { val context: CamelContext = new DefaultCamelContext val routeBuilder = new RouteBuilder { from("file:data/inbox") --> ("velocity:com/hascode/tutorial/transform.vm") --> ("file:data/outbox") } context.addRoutes(routeBuilder) context.start while (true) {} }
Now add some new file to data/inbox and run the application you should be able to spot a new file in data/outbox that could like the following one:
echo "This is just a test" > data/inbox/foo.txt
data/outbox/foo.txt:
This is the transformed file Let's iterate over the available headers: - key: camelfilenameonly has value: foo.txt - key: camelfileparent has value: data/inbox - key: camelfilename has value: foo.txt - key: breadcrumbid has value: ID-styx-59735-1360704546890-0-1 - key: camelfileabsolute has value: false - key: camelfilelength has value: 20 - key: camelfilerelativepath has value: foo.txt - key: camelfilepath has value: data/inbox/foo.txt - key: camelfilelastmodified has value: Tue Feb 12 22:29:11 CET 2013 - key: camelfileabsolutepath has value: /data/project/camel-scala-tutorial/data/inbox/foo.txt The body: GenericFile[foo.txt]
Bean Binding / Annotated Beans
Camel allows us to use POJOs to specify and endpoint or handle further processing instructions.
This is our recipient bean .. depending on the file name extension it returns two possible JMS queues used for further processing.
package com.hascode.tutorial import org.apache.camel.RecipientList import org.apache.camel.Header class RecipientsBean { @RecipientList def getRecipients(@Header("CamelFileName") fileName: String): Array[String] = { println("detecting recipients for given filename: " + fileName) if (fileName.endsWith("csv")) { return Array("jms:csvQueue") } return Array("jms:defaultQueue") } }
And this is our application that scans the directory data/inbox for files and delegates the flow to the recipient bean.
In addition we’re scanning both JMS queues for new messages and we’re printing the file names received from their messages.
package com.hascode.tutorial import org.apache.camel.CamelContext import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.scala.dsl.builder.RouteBuilder import javax.jms.ConnectionFactory import org.apache.camel.component.jms.JmsComponent import org.apache.activemq.ActiveMQConnectionFactory object AnnotatedBeanExample extends App { val context: CamelContext = new DefaultCamelContext val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost") context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); val routeBuilder = new RouteBuilder { from("file:data/inbox").bean(classOf[RecipientsBean]) from("jms:csvQueue").process(exchange => println("csv queue: file received: " + exchange.getIn().getHeader("CamelFileName"))) from("jms:defaultQueue").process(exchange => println("default queue: file received: " + exchange.getIn().getHeader("CamelFileName"))) } context.addRoutes(routeBuilder) context.start while (true) {} }
When we’re now adding some files to the directory ..
$ date > data/inbox/test.csv && date > /data/inbox/foo.txt
.. and run the application above we should get a similar output (shortened)
21:04:06.195 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Total 1 files to consume 21:04:06.196 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - About to process file: GenericFile[test.csv] using exchange: Exchange[test.csv] detecting recipients for given filename: test.csv 21:04:06.222 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.camel.component.jms.JmsProducer - Starting producer: Producer[jms://csvQueue] [..] csv queue: file received: test.csv 21:04:06.264 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Retrying attempt 0 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.csv.camelLock 21:04:06.264 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.csv.camelLock with result: true 21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Retrying attempt 0 to delete file: data/inbox/.camel/test.csv 21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to delete file: data/inbox/.camel/test.csv with result: true 21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.f.s.GenericFileRenameProcessStrategy - Renaming file: GenericFile[test.csv] to: GenericFile[.camel/test.csv] 21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to rename file: /data/project/camel-scala-tutorial/data/inbox/test.csv to: data/inbox/.camel/test.csv with result: true 21:04:06.741 [ActiveMQ Journal Checkpoint Worker] DEBUG o.a.a.store.kahadb.MessageDatabase - Checkpoint started. 21:04:06.753 [ActiveMQ Journal Checkpoint Worker] DEBUG o.a.a.store.kahadb.MessageDatabase - Checkpoint done. 21:04:06.766 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Took 0.000 seconds to poll: data/inbox [..] 21:16:37.146 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - About to process file: GenericFile[foo.txt] using exchange: Exchange[foo.txt] [..] detecting recipients for given filename: foo.txt [..] default queue: file received: test.csv [..]
Tutorial Sources
Please feel free to download the tutorial sources from my Bitbucket repository, fork it there or clone it using Mercurial:
hg clone https://bitbucket.org/hascode/camel-scala-dsl-tutorial
Resources
- Apache Camel Website
- Apache Camel: Enterprise Integration Pattern Overview
- Apache Camel: Scala DSL
- Apache Camel: Scala DSL Enterprise Integration Patterns
- osinka @Github: Camel Extras for Scala
- Claus Ibsen: A little Scala DSL Example
- Claus Ibsen, Jonathan Anstey, Hadrian Zbarcea: Camel in Action
Tags: activemq, Apache, camel, dsl, eip, imap, integration pattern, jms, load balancer, messaging, sbt, scala, velocity
February 14th, 2013 at 2:50 pm
Hi
I enjoyed reading this tutorial on the Camel DSL and how you build an application using the EIP patterns.
I took the liberty of adding a link to this article from the Camel link collection at (takes a bit to sync)
http://camel.apache.org/articles
Keep up the good writing
Claus Ibsen
February 17th, 2013 at 12:16 pm
Thanks, I feel honored!
February 17th, 2013 at 4:08 pm
Excellent article! Your examples are concise and straight to the point. I hope to read more from you.
February 27th, 2013 at 3:37 pm
The context-based route example looks very much like the example from “Camel in action” :-) I have a question regarding this: In the book, another endpoint is chained after the CBR by adding .to(“jms:furtherProcessing”).
When using the Java DSL, this works nicely; but I haven’t managed to get it working with the Scala DSL, the furtherProcessing queue never receives any messages! Any idea how to get this right?
February 28th, 2013 at 7:52 pm
Hi, thanks for your post! It is indeed no coincidence since I simply love “Camel in Action” – what reminds me of adding Claus Ibsens excellent book to the list of resources here :)
Did using “end” on the choice not work for you? I’ve written this one up for you – is that what you were searching for?
Formatted on GitHub:
https://gist.github.com/hascode/5059494
package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.camel.Processor
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
import org.apache.camel.Exchange
object RoutingAfterCBRExample extends App with RouteBuilderSupport {
val context: CamelContext = new DefaultCamelContext
val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
val routeBuilder = new RouteBuilder {
from("file:data/inbox")
.choice {
when(fileEndsWith(_, "xml")).to("jms:xmlOrders")
when(fileEndsWith(_, "csv")).to("jms:csvOrders")
otherwise().to("jms:unknownOrders")
}.end.to("jms:continuedProcessing")
from("jms:xmlOrders")
.process(exchange => println("XML type order received: " + exchange.getIn().getHeader("CamelFileName")))
from("jms:csvOrders")
.process(exchange => println("CSV type order received: " + exchange.getIn().getHeader("CamelFileName")))
from("jms:unknownOrders")
.process(exchange => println("Unknown type order received: " + exchange.getIn().getHeader("CamelFileName")))
from("jms:continuedProcessing")
.process(exchange => println("Continued processing queue received a message: " + exchange.getIn().getHeader("CamelFileName")))
}
context.addRoutes(routeBuilder)
context.start
while (true) {}
def fileEndsWith(ex: Exchange, fileExt: String): Boolean = {
ex.getIn().getHeader("CamelFileName") match {
case x: String => return x.endsWith(fileExt)
case _ => false
}
}
}
March 1st, 2013 at 7:49 am
Hi micha, thanks for your reply!
end? *smackshead* I assumed that would be implicitly done by the closing curly brace of the choice statement.
I tried it in my code (which almost exactly resembles your gist), but unfortunately the end statement is onknown to my Scala DSL. Which version are you using? I’m on Camel 2.10.3., which is according to the website the latet version…
March 4th, 2013 at 8:38 am
Ok, found the difference: I didn’t explicitly use from(..), but only the string representation of the endpoint. This works for simple from..to-expressions, but didn’t give the “end” method.
Confusing.
March 4th, 2013 at 11:47 am
That is indeed confusing.. but thanks for keeping me up to date! :)