diff --git a/src/main/scala/com/async2databricks/database/DataRepository.scala b/src/main/scala/com/async2databricks/database/DataRepository.scala index 10c5714..c1100c5 100644 --- a/src/main/scala/com/async2databricks/database/DataRepository.scala +++ b/src/main/scala/com/async2databricks/database/DataRepository.scala @@ -17,15 +17,17 @@ trait DataRepository[F[_]] { object DataRepository extends LazyLogging { def apply[F[_]: Async](xa: Transactor[F]): DataRepository[F] = - (query: String, batchSize: Int) => { - logger.info(s"Starting to stream data with query: $query") + new DataRepository[F] { + override def streamData(query: String, batchSize: Int): Stream[F, SampleData] = { + logger.info(s"Starting to stream data with query: $query") - Fragment - .const(query) - .query[SampleData] - .stream - .transact(xa) - .chunkN(batchSize) - .flatMap(chunk => Stream.chunk(chunk)) + Fragment + .const(query) + .query[SampleData] + .stream + .transact(xa) + .chunkN(batchSize) + .flatMap(chunk => Stream.chunk(chunk)) + } } }