Using Apache Camel with Scala and the Camel Scala DSL

February 13th, 2013 by

Whenever I encounter a situation where I have to mix a blend of different services and endpoints and apply one or more of the traditional enterprise integration patterns then Apache Camel often is my weapon of choice.

I simply love how easy it is to set up some datasources, add some routing magic, data transformers, load balancers, content enrichers and enjoy the result.

Another thing that I’m beginning to love is Scala and so this is the perfect time to write an article about using Scala and Apache Camel together.

In the following tutorial we’re setting up our environment using SBT and Scala we’ll take a look at several interesting use cases for camel.

 

Dependencies / Project Setup

I’m using SBT to build the project here. You just need to add some dependencies here to your  build.sbt.

name := "camel-scala-tutorial"
 
organization := "com.hascode.tutorial"
 
version := "1.0.0"
 
libraryDependencies ++= Seq(
 "org.apache.camel" % "camel-scala" % "2.10.1",
 "org.apache.activemq" % "activemq-core" % "5.6.0",
 "org.apache.camel" % "camel-jms" % "2.10.1",
 "ch.qos.logback" % "logback-core" % "1.0.9",
 "ch.qos.logback" % "logback-classic" % "1.0.9",
 "javax.mail" % "mail" % "1.4.5",
 "com.icegreen" % "greenmail" % "1.3",
 "org.apache.camel" % "camel-mail" % "2.10.3",
 "org.apache.camel" % "camel-velocity" % "2.10.3"
)

A short explanation for the dependencies declared above:

  • camel-scala: Adds the Apache Camel API, this is the minimum requirement for us here..
  • activemq-core and camel-jsm: The first one allows us to start an embedded instance of an ActiveMQ server and the second one allows us to use JMS in our routes
  • logback-core/logback-classic: Just some logging API, you may use your favourite logging framework here ..
  • greenmail: Allows us to create an embedded mail server that may handle all kinds of e-mail transfer. Some more detailed information can be found in my article “Integration Testing IMAP, SMTP and POP3 with GreenMail“.
  • camel-mail: Adds support for handling e-mail messages in a route.
  • camel-velocity: As you might have guessed is this the library to add velocity template support

Now that we’ve got every library we need let’s start to create some routes and play around with camel.

Camel Scala Enhancements

First of all, camel-scala offers a custom implementation of the route builder, org.apache.camel.scala.builder.RouteBuilder that offers us three features compared with the Java DSL:

  • there is no configure() method to override
  • a route starts directly with a URI instead of from(uri)
  • -> is just an alias for to

In addition there’s a trait that you may use, org.apache.camel.scala.dsl.builder.RouteBuilderSupport that adds an implicit conversion from the scala to the java builder:

implicit def scalaToJavaBuilder(scalaBuilder: RouteBuilder) = scalaBuilder.builder

The good thing is: You won’t encounter any problem using scala because the DSLs are 100% compatibel. In addition there are open source projects that take this goal further and provide additional syntactic sugar when using scala and camel together.

One example is Camel Extras for Scala that adds some goodies like additional enhancements for the route builder or additional type converters for different scala types and collections.

If you’re interested, please feel free to take a look at the Github project at: https://github.com/osinka/camel-scala-extra

Now that we’ve covered some basics it is time to do some programming here…

Simple File Endpoint

The first example is an easy one .. we’ve got two directories: data/inbox and data/outbox. Everything we’re putting into data/inbox is fetched by camel and put into data/outbox.

Simple directory-to-directory route

Simple directory-to-directory route

package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
 
object SimpleFileEndpointExample extends App {
 val context: CamelContext = new DefaultCamelContext
 val routeBuilder = new RouteBuilder {
 from("file:data/inbox") --> ("file:data/outbox")
 }
 context.addRoutes(routeBuilder)
 context.start
 while (true) {}
}

Running the example above yields the following output when we put a file into the directory data/inbox

$ date >> data/inbox/test.txt
16:41:15.156 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Took 0.000 seconds to poll: data/inbox
16:41:15.159 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Total 1 files to consume
16:41:15.160 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - About to process file: GenericFile[test.txt] using exchange: Exchange[test.txt]
16:41:15.166 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[test.txt]
16:41:15.169 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileOperations - Using FileChannel to write file: data/outbox/test.txt
16:41:15.174 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/test.txt] to [Endpoint[file://data/outbox]]
16:41:15.176 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.file.GenericFileOnCompletion - Done processing file: GenericFile[test.txt] using exchange: Exchange[test.txt]
16:41:15.176 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Retrying attempt 0 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.txt.camelLock
16:41:15.176 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.txt.camelLock with result: true
16:41:15.177 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.f.s.GenericFileRenameProcessStrategy - Renaming file: GenericFile[test.txt] to: GenericFile[.camel/test.txt]
16:41:15.178 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to rename file: /data/project/camel-scala-tutorial/data/inbox/test.txt to: data/inbox/.camel/test.txt with result: true

JMS / ActiveMQ Example

In the next example we’re going to do some JMS messaging using an embedded instance of ActiveMQ.

The following route scans the directory data/inbox for incoming files.

If there is a file it is put in a specific queue, filtered by the file extension. In addition we’re fetching the messages from alle JMS queues and print the processed file’s name.

package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.camel.Processor
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
import org.apache.camel.Exchange
 
object RoutingAfterCBRExample extends App with RouteBuilderSupport {
  val context: CamelContext = new DefaultCamelContext
  val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
  context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
  val routeBuilder = new RouteBuilder {
    from("file:data/inbox")
      .choice {
        when(fileEndsWith(_, "xml")).to("jms:xmlOrders")
        when(fileEndsWith(_, "csv")).to("jms:csvOrders")
        otherwise().to("jms:unknownOrders")
      }
 
    from("jms:xmlOrders")
      .process(exchange => println("XML type order received: " + exchange.getIn().getHeader("CamelFileName")))
    from("jms:csvOrders")
      .process(exchange => println("CSV type order received: " + exchange.getIn().getHeader("CamelFileName")))
    from("jms:unknownOrders")
      .process(exchange => println("Unknown type order received: " + exchange.getIn().getHeader("CamelFileName")))
  }
  context.addRoutes(routeBuilder)
  context.start
  while (true) {}
 
  def fileEndsWith(ex: Exchange, fileExt: String): Boolean = {
    ex.getIn().getHeader("CamelFileName") match {
      case x: String => return x.endsWith(fileExt)
      case _ => false
    }
  }
}

Running the application above and adding the following files to the data/inbox directory should give you a similar output (shortened)

$ date >> data/inbox/order1.csv && date >> data/inbox/someorder.xml && date >> data/inbox/strangeorder.foo && date >> data/inbox/bang
[..]
CSV type order received: order1.csv
[..]
Unknown type order received: strangeorder.foo
[..]
Unknown type order received: bang
[..]
XML type order received: someorder.xml

Resequencer Example

The resequencer allows us to bring our messages back in a predefined order .. e.g. you might want to resort incoming messages using the JMSPriority header or something like that.

In our example, we have two queues .. wip and wip-inorder. What we’re doing now is to put some messages with a priority header into the wip queue and the resequencer orders messages from this queue and puts the messages in the correct order into the wip-inorder queue.

Some more detailed information can be found at the Camel website: Resequencer.

Resequencing messages from one queue into another queue

Resequencing messages from one queue into another queue

package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.camel.Processor
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
 
object ResequencerExample extends App with RouteBuilderSupport {
 val context: CamelContext = new DefaultCamelContext
 val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
 context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
 
 val routeBuilder = new RouteBuilder {
 from("jms:wip").resequence(header("custom-priority")).batch().timeout(10000).reverse().to("jms:wip-inorder")
 
 from("jms:wip-inorder")
 .process(exchange => println("Message received with priority: " + exchange.getIn().getHeader("custom-priority")))
 }
 context.addRoutes(routeBuilder)
 context.start
 
 Thread.sleep(2000)
 println("sending some messages..")
 val tpl = context.createProducerTemplate
 tpl.setDefaultEndpointUri("jms:wip")
 tpl.sendBodyAndHeader("foo1", "custom-priority", 1)
 tpl.sendBodyAndHeader("foo2", "custom-priority", 5)
 tpl.sendBodyAndHeader("foo3", "custom-priority", 6)
 tpl.sendBodyAndHeader("foo4", "custom-priority", 3)
 tpl.sendBodyAndHeader("foo5", "custom-priority", 4)
 tpl.sendBodyAndHeader("foo6", "custom-priority", 2)
 while (true) {}
}

Running the application above should give a similar output. Now the messages are ordered by their header “custom-priority”:

[..]
16:10:02.253 [main] DEBUG o.a.camel.impl.DefaultCamelContext - Adding routes from builder: com.hascode.tutorial.ResequencerExample$$anon$1@7776cad3
16:10:02.452 [main] DEBUG o.apache.camel.impl.DefaultComponent - Creating endpoint uri=[jms://wip], path=[wip], parameters=[{}]
16:10:02.457 [main] DEBUG o.a.camel.impl.DefaultCamelContext - jms://wip converted to endpoint: Endpoint[jms://wip] by component: org.apache.camel.component.jms.JmsComponent@6b552b76
16:10:02.530 [main] DEBUG o.apache.camel.impl.DefaultComponent - Creating endpoint uri=[jms://wip-inorder], path=[wip-inorder], parameters=[{}]
16:10:02.530 [main] DEBUG o.a.camel.impl.DefaultCamelContext - jms://wip-inorder converted to endpoint: Endpoint[jms://wip-inorder] by component: org.apache.camel.component.jms.JmsComponent@6b552b76
16:10:02.555 [main] DEBUG o.a.c.m.DefaultManagementAgent - Registered MBean with ObjectName: org.apache.camel:context=styx/camel-1,type=endpoints,name="jms://wip-inorder"
16:10:02.557 [main] DEBUG o.a.c.p.interceptor.DefaultChannel - Initialize channel for target: 'To[jms:wip-inorder]'
16:10:02.589 [main] DEBUG o.a.c.p.interceptor.DefaultChannel - Initialize channel for target: 'Resequencer[{org.apache.camel.scala.ScalaExpression@6da0d866} -> [To[jms:wip-inorder]]]'
[..]
16:10:02.628 [main] DEBUG o.a.camel.component.jms.JmsProducer - Starting producer: Producer[jms://wip-inorder]
[..]
16:10:05.394 [main] DEBUG o.a.c.component.jms.JmsConfiguration - Sending JMS message to: queue://wip with message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {custom_HYPHEN_priority=1, breadcrumbId=ID-styx-41187-1360509002058-0-1}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = foo1}
[..]
16:10:05.494 [Camel (camel-1) thread #1 - JmsConsumer[wip]] DEBUG o.a.camel.processor.BatchProcessor - Received exchange to be batched: Exchange[JmsMessage[JmsMessageID: ID:styx-33805-1360509003088-3:8:1:1:1]]
16:10:15.495 [Camel (camel-1) thread #0 - Batch Sender] DEBUG o.a.camel.processor.BatchProcessor - Sending aggregated exchange: Exchange[JmsMessage[JmsMessageID: ID:styx-33805-1360509003088-3:5:1:1:1]]
16:10:15.496 [Camel (camel-1) thread #0 - Batch Sender] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[jms://wip-inorder] Exchange[JmsMessage[JmsMessageID: ID:styx-33805-1360509003088-3:5:1:1:1]]
[..]
16:10:15.499 [ActiveMQ Task-1] DEBUG o.a.a.broker.region.AbstractRegion - localhost adding consumer: ID:styx-33805-1360509003088-3:9:-1:1 for destination: ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic
16:10:15.502 [Camel (camel-1) thread #0 - Batch Sender] DEBUG o.a.c.component.jms.JmsConfiguration - Sending JMS message to: queue://wip-inorder with message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {custom_HYPHEN_priority=6, breadcrumbId=ID-styx-41187-1360509002058-0-7, CamelJmsDeliveryMode=2}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = foo3}
Message received with priority: 6
[..]
Message received with priority: 5
[..]
Message received with priority: 4
[..]
Message received with priority: 3
[..]
Message received with priority: 2
[..]
Message received with priority: 1

Load Balancer

Camel offers you an easy way for load balancing using different strategies. We’re using an easy round-robin strategy here without any special configuration.

There’s a lot of useful information on possible configuration and load balancing strategies on the Camel documentation.

Simple Round Robin Load Balancer

Simple Round Robin Load Balancer

package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.camel.Processor
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
 
object LoadBalancerExample extends App with RouteBuilderSupport {
 val context: CamelContext = new DefaultCamelContext
 val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
 context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
 
 val routeBuilder = new RouteBuilder {
 from("jms:incoming").loadBalance().
 roundRobin().to("jms:worker-queue-1", "jms:worker-queue-2", "jms:worker-queue-3");
 
 from("jms:worker-queue-1")
 .process(exchange => println("Handler 1 recevied a message with number flag: " + exchange.getIn().getHeader("my-number")))
 from("jms:worker-queue-2")
 .process(exchange => println("Handler 2 recevied a message with number flag: " + exchange.getIn().getHeader("my-number")))
 from("jms:worker-queue-3")
 .process(exchange => println("Handler 3 recevied a message with number flag: " + exchange.getIn().getHeader("my-number")))
 }
 context.addRoutes(routeBuilder)
 context.start
 
 Thread.sleep(2000)
 val tpl = context.createProducerTemplate
 tpl.setDefaultEndpointUri("jms:incoming")
 tpl.sendBodyAndHeader("foo1", "my-number", 1)
 tpl.sendBodyAndHeader("foo2", "my-number", 2)
 tpl.sendBodyAndHeader("foo3", "my-number", 3)
 tpl.sendBodyAndHeader("foo4", "my-number", 4)
 tpl.sendBodyAndHeader("foo5", "my-number", 5)
 tpl.sendBodyAndHeader("foo6", "my-number", 6)
 while (true) {}
}

Running the example above could produce a similar output – but it is not guaranteed..

Handler 1 recevied a message with number flag: 1
Handler 2 recevied a message with number flag: 2
Handler 3 recevied a message with number flag: 3
Handler 1 recevied a message with number flag: 4
Handler 2 recevied a message with number flag: 5
Handler 3 recevied a message with number flag: 6

IMAP Component

In the next example we’re setting up an email server using the greenmail library – if you’re interested, please feel free to take a look at my tutorial “Integration Testing IMAP, SMTP and POP3 with GreenMail“.

Afterwards we’re setting up a camel route that fetches e-mails from this IMAP server and stores them in the directory data/outbox.

package com.hascode.tutorial
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
import org.apache.camel.CamelContext
import com.icegreen.greenmail.user.GreenMailUser
import com.icegreen.greenmail.util.GreenMail
import com.icegreen.greenmail.util.ServerSetupTest
import javax.mail.internet.MimeMessage
import javax.mail.Session
import javax.mail.internet.InternetAddress
import javax.mail.Message
 
object ImapExample extends App {
 val mailServer = new GreenMail(ServerSetupTest.IMAP)
 mailServer.start
 val user = mailServer.setUser("test@hascode.com", "joe", "XXXX")
 
 val context: CamelContext = new DefaultCamelContext
 val routeBuilder = new RouteBuilder {
 from("imap://joe@0.0.0.0:3143?password=XXXX") --> ("file:data/outbox")
 }
 context.addRoutes(routeBuilder)
 context.start
 Thread.sleep(3000)
 for (i <- 0 until 3) {
 var message = new MimeMessage(null: Session)
 message.setFrom(new InternetAddress("test@hascode.com"))
 message.addRecipient(Message.RecipientType.TO, new InternetAddress(
 "foo@hascode.com"))
 message.setSubject("Test E-Mail #" + i)
 message.setText("This is a fine test e-mail. It is the " + i + " message of 3.")
 println("sending new email: #" + i)
 user.deliver(message)
 Thread.sleep(2000)
 }
 
 while (true) {}
}

Running the application above produces a similar output

22:01:42.794 [main] DEBUG o.apache.camel.impl.DefaultComponent - Creating endpoint uri=[imap://joe@0.0.0.0:3143?password=******], path=[joe@0.0.0.0:3143], parameters=[{password=XXXX}]
sending new email: #0
sending new email: #1
sending new email: #2
22:02:44.214 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Polling mailbox folder: imap://0.0.0.0:3143, folder=INBOX
22:02:44.298 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Fetching 3 messages. Total 3 messages.
22:02:44.480 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Processing message: messageNumber=[1], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #0], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]
22:02:44.648 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[MailMessage: messageNumber=[1], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #0], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]]
22:02:44.654 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.file.FileOperations - Using InputStream to write file: data/outbox/ID-styx-40497-1360702902385-0-1
22:02:44.660 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/ID-styx-40497-1360702902385-0-1] to [Endpoint[file://data/outbox]]
22:02:44.743 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Processing message: messageNumber=[2], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #1], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]
22:02:44.904 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[MailMessage: messageNumber=[2], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #1], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]]
22:02:44.904 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.file.FileOperations - Using InputStream to write file: data/outbox/ID-styx-40497-1360702902385-0-3
22:02:44.908 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/ID-styx-40497-1360702902385-0-3] to [Endpoint[file://data/outbox]]
22:02:44.987 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Processing message: messageNumber=[3], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #2], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]
22:02:45.148 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[MailMessage: messageNumber=[3], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #2], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]]
22:02:45.148 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.file.FileOperations - Using InputStream to write file: data/outbox/ID-styx-40497-1360702902385-0-5
22:02:45.152 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/ID-styx-40497-1360702902385-0-5] to [Endpoint[file://data/outbox]]

If you take a look now in the directory data/outbox you can see the messages .. each file contains the e-mail body:

$ tree data/outbox/
data/outbox/
├── ID-localhost-40497-1360702902385-0-1
├── ID-localhost-40497-1360702902385-0-3
└── ID-localhost-40497-1360702902385-0-5

Velocity Templates

In the following simple example we’re just scanning the directory data/inbox for incoming files. When a file is added to this directory, a velocity template is applied to create a new file.

First of all this is our velocity template in com/hascode/tutorial/ named transform.vm

This is the transformed file
 
Let's iterate over the available headers:
 
#foreach ($key in $headers.keySet())
- key: $key has value: $headers.get($key)
#end
 
The body:
${body}

This is the application and the route binding.

package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
 
object VelocityExample extends App {
 val context: CamelContext = new DefaultCamelContext
 val routeBuilder = new RouteBuilder {
 from("file:data/inbox") --> ("velocity:com/hascode/tutorial/transform.vm") --> ("file:data/outbox")
 }
 context.addRoutes(routeBuilder)
 context.start
 while (true) {}
}

Now add some new file to data/inbox and run the application you should be able to spot a new file in data/outbox that could like the following one:

echo "This is just a test" > data/inbox/foo.txt

data/outbox/foo.txt:

This is the transformed file
 
Let's iterate over the available headers:
 
- key: camelfilenameonly has value: foo.txt
- key: camelfileparent has value: data/inbox
- key: camelfilename has value: foo.txt
- key: breadcrumbid has value: ID-styx-59735-1360704546890-0-1
- key: camelfileabsolute has value: false
- key: camelfilelength has value: 20
- key: camelfilerelativepath has value: foo.txt
- key: camelfilepath has value: data/inbox/foo.txt
- key: camelfilelastmodified has value: Tue Feb 12 22:29:11 CET 2013
- key: camelfileabsolutepath has value: /data/project/camel-scala-tutorial/data/inbox/foo.txt
 
The body:
GenericFile[foo.txt]

Bean Binding / Annotated Beans

Camel allows us to use POJOs to specify and endpoint or handle further processing instructions.

This is our recipient bean .. depending on the file name extension it returns two possible JMS queues used for further processing.

package com.hascode.tutorial
import org.apache.camel.RecipientList
import org.apache.camel.Header
 
class RecipientsBean {
 
 @RecipientList def getRecipients(@Header("CamelFileName") fileName: String): Array[String] = {
 println("detecting recipients for given filename: " + fileName)
 if (fileName.endsWith("csv")) {
 return Array("jms:csvQueue")
 }
 return Array("jms:defaultQueue")
 }
}

And this is our application that scans the directory data/inbox for files and delegates the flow to the recipient bean.

In addition we’re scanning both JMS queues for new messages and we’re printing the file names received from their messages.

package com.hascode.tutorial
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.RouteBuilder
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.apache.activemq.ActiveMQConnectionFactory
 
object AnnotatedBeanExample extends App {
 val context: CamelContext = new DefaultCamelContext
 val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
 context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
 
 val routeBuilder = new RouteBuilder {
 from("file:data/inbox").bean(classOf[RecipientsBean])
 
 from("jms:csvQueue").process(exchange => println("csv queue: file received: " + exchange.getIn().getHeader("CamelFileName")))
 from("jms:defaultQueue").process(exchange => println("default queue: file received: " + exchange.getIn().getHeader("CamelFileName")))
 }
 context.addRoutes(routeBuilder)
 context.start
 while (true) {}
}

When we’re now adding some files to the directory ..

$ date > data/inbox/test.csv && date > /data/inbox/foo.txt

.. and run the application above we should get a similar output (shortened)

21:04:06.195 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Total 1 files to consume
21:04:06.196 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - About to process file: GenericFile[test.csv] using exchange: Exchange[test.csv]
detecting recipients for given filename: test.csv
21:04:06.222 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.camel.component.jms.JmsProducer - Starting producer: Producer[jms://csvQueue]
[..]
csv queue: file received: test.csv
21:04:06.264 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Retrying attempt 0 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.csv.camelLock
21:04:06.264 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.csv.camelLock with result: true
21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Retrying attempt 0 to delete file: data/inbox/.camel/test.csv
21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to delete file: data/inbox/.camel/test.csv with result: true
21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.f.s.GenericFileRenameProcessStrategy - Renaming file: GenericFile[test.csv] to: GenericFile[.camel/test.csv]
21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to rename file: /data/project/camel-scala-tutorial/data/inbox/test.csv to: data/inbox/.camel/test.csv with result: true
21:04:06.741 [ActiveMQ Journal Checkpoint Worker] DEBUG o.a.a.store.kahadb.MessageDatabase - Checkpoint started.
21:04:06.753 [ActiveMQ Journal Checkpoint Worker] DEBUG o.a.a.store.kahadb.MessageDatabase - Checkpoint done.
21:04:06.766 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Took 0.000 seconds to poll: data/inbox
[..]
21:16:37.146 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - About to process file: GenericFile[foo.txt] using exchange: Exchange[foo.txt]
[..]
detecting recipients for given filename: foo.txt
[..]
default queue: file received: test.csv
[..]

Tutorial Sources

Please feel free to download the tutorial sources from my Bitbucket repository, fork it there or clone it using Mercurial:

hg clone https://bitbucket.org/hascode/camel-scala-dsl-tutorial

Resources

Tags: , , , , , , , , , , , ,

8 Responses to “Using Apache Camel with Scala and the Camel Scala DSL”

  1. Claus Ibsen Says:

    Hi

    I enjoyed reading this tutorial on the Camel DSL and how you build an application using the EIP patterns.

    I took the liberty of adding a link to this article from the Camel link collection at (takes a bit to sync)
    http://camel.apache.org/articles

    Keep up the good writing

    Claus Ibsen

  2. micha kops Says:

    Thanks, I feel honored!

  3. Tai Siew Joon Says:

    Excellent article! Your examples are concise and straight to the point. I hope to read more from you.

  4. Stefan Says:

    The context-based route example looks very much like the example from “Camel in action” :-) I have a question regarding this: In the book, another endpoint is chained after the CBR by adding .to(“jms:furtherProcessing”).
    When using the Java DSL, this works nicely; but I haven’t managed to get it working with the Scala DSL, the furtherProcessing queue never receives any messages! Any idea how to get this right?

  5. micha kops Says:

    Hi, thanks for your post! It is indeed no coincidence since I simply love “Camel in Action” – what reminds me of adding Claus Ibsens excellent book to the list of resources here :)
    Did using “end” on the choice not work for you? I’ve written this one up for you – is that what you were searching for?

    Formatted on GitHub:

    https://gist.github.com/hascode/5059494


    package com.hascode.tutorial
    import org.apache.camel.scala.dsl.builder.RouteBuilder
    import org.apache.camel.CamelContext
    import org.apache.camel.impl.DefaultCamelContext
    import javax.jms.ConnectionFactory
    import org.apache.camel.component.jms.JmsComponent
    import org.apache.activemq.ActiveMQConnectionFactory
    import org.apache.camel.Processor
    import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
    import org.apache.camel.Exchange

    object RoutingAfterCBRExample extends App with RouteBuilderSupport {
    val context: CamelContext = new DefaultCamelContext
    val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
    context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
    val routeBuilder = new RouteBuilder {
    from("file:data/inbox")
    .choice {
    when(fileEndsWith(_, "xml")).to("jms:xmlOrders")
    when(fileEndsWith(_, "csv")).to("jms:csvOrders")
    otherwise().to("jms:unknownOrders")
    }.end.to("jms:continuedProcessing")

    from("jms:xmlOrders")
    .process(exchange => println("XML type order received: " + exchange.getIn().getHeader("CamelFileName")))
    from("jms:csvOrders")
    .process(exchange => println("CSV type order received: " + exchange.getIn().getHeader("CamelFileName")))
    from("jms:unknownOrders")
    .process(exchange => println("Unknown type order received: " + exchange.getIn().getHeader("CamelFileName")))
    from("jms:continuedProcessing")
    .process(exchange => println("Continued processing queue received a message: " + exchange.getIn().getHeader("CamelFileName")))
    }
    context.addRoutes(routeBuilder)
    context.start
    while (true) {}

    def fileEndsWith(ex: Exchange, fileExt: String): Boolean = {
    ex.getIn().getHeader("CamelFileName") match {
    case x: String => return x.endsWith(fileExt)
    case _ => false
    }
    }
    }

  6. Stefan Says:

    Hi micha, thanks for your reply!

    end? *smackshead* I assumed that would be implicitly done by the closing curly brace of the choice statement.

    I tried it in my code (which almost exactly resembles your gist), but unfortunately the end statement is onknown to my Scala DSL. Which version are you using? I’m on Camel 2.10.3., which is according to the website the latet version…

  7. Stefan Says:

    Ok, found the difference: I didn’t explicitly use from(..), but only the string representation of the endpoint. This works for simple from..to-expressions, but didn’t give the “end” method.
    Confusing.

  8. micha kops Says:

    That is indeed confusing.. but thanks for keeping me up to date! :)

Leave a Reply

Please leave these two fields as-is:

Protected by Invisible Defender. Showed 403 to 81,008 bad guys.

Search
Tags
Categories