Skip to main content

Receiving Data

Handling Decoding Yourself​

If we just wanted to receive some data we could call the receiveMessage and deleteMessage methods ourselves:

import scala.concurrent.duration._

def printAction(s: String) = IO { println(s) }

queue.receiveMessage(max = 5, wait = Some(10.seconds)).flatMap { response =>
response.messages.traverse { message =>
printAction(message.body) *> queue.deleteMessage(message.receiptHandle)
}
}
// res0: IO[Vector[DeleteMessageResponse]] = IO$461021903

Delegating Decoding​

Again there are some helpers that can decode for you:

import com.meltwater.fawn.codec.circe.CirceCodec._
import io.circe.Json

def printAction(j: Json) = IO { println(j.noSpaces) }

queue.receiveAs[Json](max = 10, wait = Some(1.second)).flatMap { messages =>
messages.traverse { message =>
printAction(message.body) *> queue.ack(message)
}
}
// res1: IO[Vector[DeleteMessageResponse]] = IO$491079427

Using a Consumer​

This library provides an SQSConsumer which handles most of the control flow for you, it can be used like this:

val consumer: SQSConsumer[IO] = SQSConsumer(queue)
// consumer: SQSConsumer[IO] = com.meltwater.fawn.sqs.SQSConsumer$$anon$1@51da5a28

consumer.process[Json] { message =>
printAction(message.body)
}
// res2: fs2.Stream[IO, Unit] = Stream(..)

This will give you one message at a time, with its headers, and handle acking for you if your action succeeded.