Receiving Data
Handling Decoding Yourself​
If we just wanted to receive some data we could call the receiveMessage
and deleteMessage
methods ourselves:
import scala.concurrent.duration._
def printAction(s: String) = IO { println(s) }
queue.receiveMessage(max = 5, wait = Some(10.seconds)).flatMap { response =>
response.messages.traverse { message =>
printAction(message.body) *> queue.deleteMessage(message.receiptHandle)
}
}
// res0: IO[Vector[DeleteMessageResponse]] = IO$461021903
Delegating Decoding​
Again there are some helpers that can decode for you:
import com.meltwater.fawn.codec.circe.CirceCodec._
import io.circe.Json
def printAction(j: Json) = IO { println(j.noSpaces) }
queue.receiveAs[Json](max = 10, wait = Some(1.second)).flatMap { messages =>
messages.traverse { message =>
printAction(message.body) *> queue.ack(message)
}
}
// res1: IO[Vector[DeleteMessageResponse]] = IO$491079427
Using a Consumer​
This library provides an SQSConsumer
which handles most of the control flow for you, it can be used like this:
val consumer: SQSConsumer[IO] = SQSConsumer(queue)
// consumer: SQSConsumer[IO] = com.meltwater.fawn.sqs.SQSConsumer$$anon$1@51da5a28
consumer.process[Json] { message =>
printAction(message.body)
}
// res2: fs2.Stream[IO, Unit] = Stream(..)
This will give you one message at a time, with its headers, and handle acking for you if your action succeeded.