Open-sourcing Unified User Actions

Unified User Action (UUA) is a centralized, real-time stream of user actions on Twitter, consumed by various product, ML, and marketing teams. UUA makes sure all internal teams consume the uniformed user actions data in an accurate and fast way.
This commit is contained in:
twitter-team
2023-04-10 09:34:13 -07:00
parent f1b5c32734
commit 617c8c787d
250 changed files with 25277 additions and 0 deletions

View File

@ -0,0 +1 @@
# This prevents SQ query from grabbing //:all since it traverses up once to find a BUILD

View File

@ -0,0 +1,24 @@
## Aurora deploy
## From master branch
```
aurora workflow build unified_user_actions/service/deploy/uua-partitioner-staging.workflow
```
## From your own branch
```
git push origin <LDAP>/<BRANCH>
aurora workflow build --build-branch=<LDAP>/<BRANCH> unified_user_actions/service/deploy/uua-partitioner-staging.workflow
```
* Check build status:
* Dev
* https://workflows.twitter.biz/workflow/discode/uua-partitioner-staging/
## Monitor output topic EPS
* Prod
* unified_user_actions: https://monitoring.twitter.biz/tiny/2942881
* Dev
* unified_user_action_sample1: https://monitoring.twitter.biz/tiny/2942879

View File

@ -0,0 +1,5 @@
scala_library(
sources = ["*.scala"],
tags = ["bazel-compatible"],
dependencies = [],
)

View File

@ -0,0 +1,16 @@
package com.twitter.unified_user_actions.enricher
/**
* When this exception is thrown, it means that an assumption in the enricher services
* was violated and it needs to be fixed before a production deployment.
*/
abstract class FatalException(msg: String) extends Exception(msg)
class ImplementationException(msg: String) extends FatalException(msg)
object Exceptions {
def require(requirement: Boolean, message: String): Unit = {
if (!requirement)
throw new ImplementationException("requirement failed: " + message)
}
}

View File

@ -0,0 +1,11 @@
scala_library(
sources = ["*.scala"],
tags = ["bazel-compatible"],
dependencies = [
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/hydrator:base",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/partitioner:base",
"unified_user_actions/enricher/src/main/thrift/com/twitter/unified_user_actions/enricher/internal:internal-scala",
"unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions:unified_user_actions-scala",
],
)

View File

@ -0,0 +1,99 @@
package com.twitter.unified_user_actions.enricher.driver
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStageType.Hydration
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStageType.Repartition
import com.twitter.util.Future
import EnrichmentPlanUtils._
import com.twitter.unified_user_actions.enricher.Exceptions
import com.twitter.unified_user_actions.enricher.ImplementationException
import com.twitter.unified_user_actions.enricher.hydrator.Hydrator
import com.twitter.unified_user_actions.enricher.partitioner.Partitioner
/**
* A driver that will execute on a key, value tuple and produce an output to a Kafka topic.
*
* The output Kafka topic will depend on the current enrichment plan. In one scenario, the driver
* will output to a partitioned Kafka topic if the output needs to be repartitioned (after it has
* been hydrated 0 or more times as necessary). In another scenario, the driver will output to
* the final topic if there's no more work to be done.
*
* @param finalOutputTopic The final output Kafka topic
* @param partitionedTopic The intermediate Kafka topic used for repartitioning based on [[EnrichmentKey]]
* @param hydrator A hydrator that knows how to populate the metadata based on the current plan / instruction.
* @param partitioner A partitioner that knows how to transform the current uua event into an [[EnrichmentKey]].
*/
class EnrichmentDriver(
finalOutputTopic: Option[String],
partitionedTopic: String,
hydrator: Hydrator,
partitioner: Partitioner) {
/**
* A driver that does the following when being executed.
* It checks if we are done with enrichment plan, if not:
* - is the current stage repartitioning?
* -> remap the output key, update plan accordingly then return with the new partition key
* - is the current stage hydration?
* -> use the hydrator to hydrate the envelop, update the plan accordingly, then proceed
* recursively unless the next stage is repartitioning or this is the last stage.
*/
def execute(
key: Option[EnrichmentKey],
envelop: Future[EnrichmentEnvelop]
): Future[(Option[EnrichmentKey], EnrichmentEnvelop)] = {
envelop.flatMap { envelop =>
val plan = envelop.plan
if (plan.isEnrichmentComplete) {
val topic = finalOutputTopic.getOrElse(
throw new ImplementationException(
"A final output Kafka topic is supposed to be used but " +
"no final output topic was provided."))
Future.value((key, envelop.copy(plan = plan.markLastStageCompletedWithOutputTopic(topic))))
} else {
val currentStage = plan.getCurrentStage
currentStage.stageType match {
case Repartition =>
Exceptions.require(
currentStage.instructions.size == 1,
s"re-partitioning needs exactly 1 instruction but ${currentStage.instructions.size} was provided")
val instruction = currentStage.instructions.head
val outputKey = partitioner.repartition(instruction, envelop)
val outputValue = envelop.copy(
plan = plan.markStageCompletedWithOutputTopic(
stage = currentStage,
outputTopic = partitionedTopic)
)
Future.value((outputKey, outputValue))
case Hydration =>
Exceptions.require(
currentStage.instructions.nonEmpty,
"hydration needs at least one instruction")
// Hydration is either initialized or completed after this, failure state
// will have to be handled upstream. Any unhandled exception will abort the entire
// stage.
// This is so that if the error in unrecoverable, the hydrator can choose to return an
// un-hydrated envelop to tolerate the error.
val finalEnvelop = currentStage.instructions.foldLeft(Future.value(envelop)) {
(curEnvelop, instruction) =>
curEnvelop.flatMap(e => hydrator.hydrate(instruction, key, e))
}
val outputValue = finalEnvelop.map(e =>
e.copy(
plan = plan.markStageCompleted(stage = currentStage)
))
// continue executing other stages if it can (locally) until a terminal state
execute(key, outputValue)
case _ =>
throw new ImplementationException(s"Invalid / unsupported stage type $currentStage")
}
}
}
}
}

View File

@ -0,0 +1,71 @@
package com.twitter.unified_user_actions.enricher.driver
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentPlan
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStage
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStageStatus.Completion
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStageStatus.Initialized
object EnrichmentPlanUtils {
implicit class EnrichmentPlanStatus(plan: EnrichmentPlan) {
/**
* Check each stage of the plan to know if we are done
*/
def isEnrichmentComplete: Boolean = {
plan.stages.forall(stage => stage.status == Completion)
}
/**
* Get the next stage in the enrichment process. Note, if there is none this will throw
* an exception.
*/
def getCurrentStage: EnrichmentStage = {
val next = plan.stages.find(stage => stage.status == Initialized)
next match {
case Some(stage) => stage
case None => throw new IllegalStateException("check for plan completion first")
}
}
def getLastCompletedStage: EnrichmentStage = {
val completed = plan.stages.reverse.find(stage => stage.status == Completion)
completed match {
case Some(stage) => stage
case None => throw new IllegalStateException("check for plan completion first")
}
}
/**
* Copy the current plan with the requested stage marked as complete
*/
def markStageCompletedWithOutputTopic(
stage: EnrichmentStage,
outputTopic: String
): EnrichmentPlan = {
plan.copy(
stages = plan.stages.map(s =>
if (s == stage) s.copy(status = Completion, outputTopic = Some(outputTopic)) else s)
)
}
def markStageCompleted(
stage: EnrichmentStage
): EnrichmentPlan = {
plan.copy(
stages = plan.stages.map(s => if (s == stage) s.copy(status = Completion) else s)
)
}
/**
* Copy the current plan with the last stage marked as necessary
*/
def markLastStageCompletedWithOutputTopic(
outputTopic: String
): EnrichmentPlan = {
val last = plan.stages.last
plan.copy(
stages = plan.stages.map(s =>
if (s == last) s.copy(status = Completion, outputTopic = Some(outputTopic)) else s)
)
}
}
}

View File

@ -0,0 +1,11 @@
scala_library(
sources = ["*.scala"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/guava",
"featureswitches/dynmap/src/main/scala/com/twitter/dynmap:dynmap-core",
"featureswitches/dynmap/src/main/scala/com/twitter/dynmap/json:dynmap-json",
"graphql/thrift/src/main/thrift/com/twitter/graphql:graphql-scala",
"util/util-core:scala",
],
)

View File

@ -0,0 +1,66 @@
package com.twitter.unified_user_actions.enricher.graphql
import com.google.common.util.concurrent.RateLimiter
import com.twitter.dynmap.DynMap
import com.twitter.dynmap.json.DynMapJson
import com.twitter.finagle.stats.Counter
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.util.logging.Logging
import com.twitter.util.Return
import com.twitter.util.Throw
import com.twitter.util.Try
/**
* @param dm The DynMap parsed from the returned Json string
*/
case class GraphqlRspErrors(dm: DynMap) extends Exception {
override def toString: String = dm.toString()
}
object GraphqlRspParser extends Logging {
private val rateLimiter = RateLimiter.create(1.0) // at most 1 log message per second
private def rateLimitedLogError(e: Throwable): Unit =
if (rateLimiter.tryAcquire()) {
error(e.getMessage, e)
}
/**
* GraphQL's response is a Json string.
* This function first parses the raw response as a Json string, then it checks if the returned
* object has the "data" field which means the response is expected. The response could also
* return a valid Json string but with errors inside it as a list of "errors".
*/
def toDynMap(
rsp: String,
invalidRspCounter: Counter = NullStatsReceiver.NullCounter,
failedReqCounter: Counter = NullStatsReceiver.NullCounter
): Try[DynMap] = {
val rawRsp: Try[DynMap] = DynMapJson.fromJsonString(rsp)
rawRsp match {
case Return(r) =>
if (r.getMapOpt("data").isDefined) Return(r)
else {
invalidRspCounter.incr()
rateLimitedLogError(GraphqlRspErrors(r))
Throw(GraphqlRspErrors(r))
}
case Throw(e) =>
rateLimitedLogError(e)
failedReqCounter.incr()
Throw(e)
}
}
/**
* Similar to `toDynMap` above, but returns an Option
*/
def toDynMapOpt(
rsp: String,
invalidRspCounter: Counter = NullStatsReceiver.NullCounter,
failedReqCounter: Counter = NullStatsReceiver.NullCounter
): Option[DynMap] =
toDynMap(
rsp = rsp,
invalidRspCounter = invalidRspCounter,
failedReqCounter = failedReqCounter).toOption
}

View File

@ -0,0 +1,11 @@
scala_library(
name = "hcache",
sources = ["*.scala"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/guava",
"util/util-cache-guava/src/main/scala",
"util/util-cache/src/main/scala",
"util/util-stats/src/main/scala/com/twitter/finagle/stats",
],
)

View File

@ -0,0 +1,34 @@
package com.twitter.unified_user_actions.enricher.hcache
import com.google.common.cache.Cache
import com.twitter.cache.FutureCache
import com.twitter.cache.guava.GuavaCache
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.util.Future
/**
* A local cache implementation using GuavaCache.
* Underneath it uses a customized version of the EvictingCache to 1) deal with Futures, 2) add more stats.
*/
class LocalCache[K, V](
underlying: Cache[K, Future[V]],
statsReceiver: StatsReceiver = NullStatsReceiver) {
private[this] val cache = new GuavaCache(underlying)
private[this] val evictingCache: FutureCache[K, V] =
ObservedEvictingCache(underlying = cache, statsReceiver = statsReceiver)
def getOrElseUpdate(key: K)(fn: => Future[V]): Future[V] = evictingCache.getOrElseUpdate(key)(fn)
def get(key: K): Option[Future[V]] = evictingCache.get(key)
def evict(key: K, value: Future[V]): Boolean = evictingCache.evict(key, value)
def set(key: K, value: Future[V]): Unit = evictingCache.set(key, value)
def reset(): Unit =
underlying.invalidateAll()
def size: Int = evictingCache.size
}

View File

@ -0,0 +1,91 @@
package com.twitter.unified_user_actions.enricher.hcache
import com.twitter.cache.FutureCache
import com.twitter.cache.FutureCacheProxy
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.util.Future
import scala.annotation.nowarn
/**
* Adds stats and reuse the main logic of the EvictingCache.
*/
class ObservedEvictingCache[K, V](underlying: FutureCache[K, V], scopedStatsReceiver: StatsReceiver)
extends FutureCacheProxy[K, V](underlying) {
import ObservedEvictingCache._
private[this] val getsCounter = scopedStatsReceiver.counter(StatsNames.Gets)
private[this] val setsCounter = scopedStatsReceiver.counter(StatsNames.Sets)
private[this] val hitsCounter = scopedStatsReceiver.counter(StatsNames.Hits)
private[this] val missesCounter = scopedStatsReceiver.counter(StatsNames.Misses)
private[this] val evictionsCounter = scopedStatsReceiver.counter(StatsNames.Evictions)
private[this] val failedFuturesCounter = scopedStatsReceiver.counter(StatsNames.FailedFutures)
@nowarn("cat=unused")
private[this] val cacheSizeGauge = scopedStatsReceiver.addGauge(StatsNames.Size)(underlying.size)
private[this] def evictOnFailure(k: K, f: Future[V]): Future[V] = {
f.onFailure { _ =>
failedFuturesCounter.incr()
evict(k, f)
}
f // we return the original future to make evict(k, f) easier to work with.
}
override def set(k: K, v: Future[V]): Unit = {
setsCounter.incr()
super.set(k, v)
evictOnFailure(k, v)
}
override def getOrElseUpdate(k: K)(v: => Future[V]): Future[V] = {
getsCounter.incr()
var computeWasEvaluated = false
def computeWithTracking: Future[V] = v.onSuccess { _ =>
computeWasEvaluated = true
missesCounter.incr()
}
evictOnFailure(
k,
super.getOrElseUpdate(k)(computeWithTracking).onSuccess { _ =>
if (!computeWasEvaluated) hitsCounter.incr()
}
).interruptible()
}
override def get(key: K): Option[Future[V]] = {
getsCounter.incr()
val value = super.get(key)
value match {
case Some(_) => hitsCounter.incr()
case _ => missesCounter.incr()
}
value
}
override def evict(key: K, value: Future[V]): Boolean = {
val evicted = super.evict(key, value)
if (evicted) evictionsCounter.incr()
evicted
}
}
object ObservedEvictingCache {
object StatsNames {
val Gets = "gets"
val Hits = "hits"
val Misses = "misses"
val Sets = "sets"
val Evictions = "evictions"
val FailedFutures = "failed_futures"
val Size = "size"
}
/**
* Wraps an underlying FutureCache, ensuring that failed Futures that are set in
* the cache are evicted later.
*/
def apply[K, V](underlying: FutureCache[K, V], statsReceiver: StatsReceiver): FutureCache[K, V] =
new ObservedEvictingCache[K, V](underlying, statsReceiver)
}

View File

@ -0,0 +1,58 @@
package com.twitter.unified_user_actions.enricher.hydrator
import com.google.common.util.concurrent.RateLimiter
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.unified_user_actions.enricher.FatalException
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
import com.twitter.util.Future
import com.twitter.util.logging.Logging
abstract class AbstractHydrator(scopedStatsReceiver: StatsReceiver) extends Hydrator with Logging {
object StatsNames {
val Exceptions = "exceptions"
val EmptyKeys = "empty_keys"
val Hydrations = "hydrations"
}
private val exceptionsCounter = scopedStatsReceiver.counter(StatsNames.Exceptions)
private val emptyKeysCounter = scopedStatsReceiver.counter(StatsNames.EmptyKeys)
private val hydrationsCounter = scopedStatsReceiver.counter(StatsNames.Hydrations)
// at most 1 log message per second
private val rateLimiter = RateLimiter.create(1.0)
private def rateLimitedLogError(e: Throwable): Unit =
if (rateLimiter.tryAcquire()) {
error(e.getMessage, e)
}
protected def safelyHydrate(
instruction: EnrichmentInstruction,
keyOpt: EnrichmentKey,
envelop: EnrichmentEnvelop
): Future[EnrichmentEnvelop]
override def hydrate(
instruction: EnrichmentInstruction,
keyOpt: Option[EnrichmentKey],
envelop: EnrichmentEnvelop
): Future[EnrichmentEnvelop] = {
keyOpt
.map(key => {
safelyHydrate(instruction, key, envelop)
.onSuccess(_ => hydrationsCounter.incr())
.rescue {
case e: FatalException => Future.exception(e)
case e =>
rateLimitedLogError(e)
exceptionsCounter.incr()
Future.value(envelop)
}
}).getOrElse({
emptyKeysCounter.incr()
Future.value(envelop)
})
}
}

View File

@ -0,0 +1,36 @@
scala_library(
name = "default",
sources = [
"AbstractHydrator.scala",
"DefaultHydrator.scala",
],
tags = ["bazel-compatible"],
dependencies = [
":base",
"featureswitches/dynmap/src/main/scala/com/twitter/dynmap:dynmap-core",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/graphql",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/hcache",
"unified_user_actions/enricher/src/main/thrift/com/twitter/unified_user_actions/enricher/internal:internal-scala",
],
)
scala_library(
name = "noop",
sources = ["NoopHydrator.scala"],
tags = ["bazel-compatible"],
dependencies = [
":base",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher",
"unified_user_actions/enricher/src/main/thrift/com/twitter/unified_user_actions/enricher/internal:internal-scala",
],
)
scala_library(
name = "base",
sources = ["Hydrator.scala"],
tags = ["bazel-compatible"],
dependencies = [
"unified_user_actions/enricher/src/main/thrift/com/twitter/unified_user_actions/enricher/internal:internal-scala",
],
)

View File

@ -0,0 +1,90 @@
package com.twitter.unified_user_actions.enricher.hydrator
import com.twitter.dynmap.DynMap
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.graphql.thriftscala.AuthHeaders
import com.twitter.graphql.thriftscala.Authentication
import com.twitter.graphql.thriftscala.Document
import com.twitter.graphql.thriftscala.GraphQlRequest
import com.twitter.graphql.thriftscala.GraphqlExecutionService
import com.twitter.graphql.thriftscala.Variables
import com.twitter.unified_user_actions.enricher.ImplementationException
import com.twitter.unified_user_actions.enricher.graphql.GraphqlRspParser
import com.twitter.unified_user_actions.enricher.hcache.LocalCache
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentIdType
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
import com.twitter.unified_user_actions.thriftscala.AuthorInfo
import com.twitter.unified_user_actions.thriftscala.Item
import com.twitter.util.Future
class DefaultHydrator(
cache: LocalCache[EnrichmentKey, DynMap],
graphqlClient: GraphqlExecutionService.FinagledClient,
scopedStatsReceiver: StatsReceiver = NullStatsReceiver)
extends AbstractHydrator(scopedStatsReceiver) {
private def constructGraphqlReq(
enrichmentKey: EnrichmentKey
): GraphQlRequest =
enrichmentKey.keyType match {
case EnrichmentIdType.TweetId =>
GraphQlRequest(
// see go/graphiql/M5sHxua-RDiRtTn48CAhng
document = Document.DocumentId("M5sHxua-RDiRtTn48CAhng"),
operationName = Some("TweetHydration"),
variables = Some(
Variables.JsonEncodedVariables(s"""{"rest_id": "${enrichmentKey.id}"}""")
),
authentication = Authentication.AuthHeaders(
AuthHeaders()
)
)
case _ =>
throw new ImplementationException(
s"Missing implementation for hydration of type ${enrichmentKey.keyType}")
}
private def hydrateAuthorInfo(item: Item.TweetInfo, authorId: Option[Long]): Item.TweetInfo = {
item.tweetInfo.actionTweetAuthorInfo match {
case Some(_) => item
case _ =>
item.copy(tweetInfo = item.tweetInfo.copy(
actionTweetAuthorInfo = Some(AuthorInfo(authorId = authorId))
))
}
}
override protected def safelyHydrate(
instruction: EnrichmentInstruction,
key: EnrichmentKey,
envelop: EnrichmentEnvelop
): Future[EnrichmentEnvelop] = {
instruction match {
case EnrichmentInstruction.TweetEnrichment =>
val dynMapFuture = cache.getOrElseUpdate(key) {
graphqlClient
.graphql(constructGraphqlReq(enrichmentKey = key))
.map { body =>
body.response.flatMap { r =>
GraphqlRspParser.toDynMapOpt(r)
}.get
}
}
dynMapFuture.map(map => {
val authorIdOpt =
map.getLongOpt("data.tweet_result_by_rest_id.result.core.user.legacy.id_str")
val hydratedEnvelop = envelop.uua.item match {
case item: Item.TweetInfo =>
envelop.copy(uua = envelop.uua.copy(item = hydrateAuthorInfo(item, authorIdOpt)))
case _ => envelop
}
hydratedEnvelop
})
case _ => Future.value(envelop)
}
}
}

View File

@ -0,0 +1,14 @@
package com.twitter.unified_user_actions.enricher.hydrator
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
import com.twitter.util.Future
trait Hydrator {
def hydrate(
instruction: EnrichmentInstruction,
key: Option[EnrichmentKey],
envelop: EnrichmentEnvelop
): Future[EnrichmentEnvelop]
}

View File

@ -0,0 +1,27 @@
package com.twitter.unified_user_actions.enricher.hydrator
import com.twitter.unified_user_actions.enricher.ImplementationException
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
import com.twitter.util.Future
/**
* This hydrator does nothing. If it's used by mistake for any reason, an exception will be thrown.
* Use this when you expect to have no hydration (for example, the planner shouldn't hydrate anything
* and only would perform the partitioning function).
*/
object NoopHydrator {
val OutputTopic: Option[String] = None
}
class NoopHydrator extends Hydrator {
override def hydrate(
instruction: EnrichmentInstruction,
key: Option[EnrichmentKey],
envelop: EnrichmentEnvelop
): Future[EnrichmentEnvelop] = {
throw new ImplementationException(
"NoopHydrator shouldn't be invoked when configure. Check your " +
"enrichment plan.")
}
}

View File

@ -0,0 +1,18 @@
scala_library(
name = "default",
sources = ["DefaultPartitioner.scala"],
tags = ["bazel-compatible"],
dependencies = [
":base",
"unified_user_actions/enricher/src/main/thrift/com/twitter/unified_user_actions/enricher/internal:internal-scala",
],
)
scala_library(
name = "base",
sources = ["Partitioner.scala"],
tags = ["bazel-compatible"],
dependencies = [
"unified_user_actions/enricher/src/main/thrift/com/twitter/unified_user_actions/enricher/internal:internal-scala",
],
)

View File

@ -0,0 +1,37 @@
package com.twitter.unified_user_actions.enricher.partitioner
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentIdType
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction.NotificationTweetEnrichment
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction.TweetEnrichment
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
import com.twitter.unified_user_actions.enricher.partitioner.DefaultPartitioner.NullKey
import com.twitter.unified_user_actions.thriftscala.Item
import com.twitter.unified_user_actions.thriftscala.NotificationContent
object DefaultPartitioner {
val NullKey: Option[EnrichmentKey] = None
}
class DefaultPartitioner extends Partitioner {
override def repartition(
instruction: EnrichmentInstruction,
envelop: EnrichmentEnvelop
): Option[EnrichmentKey] = {
(instruction, envelop.uua.item) match {
case (TweetEnrichment, Item.TweetInfo(info)) =>
Some(EnrichmentKey(EnrichmentIdType.TweetId, info.actionTweetId))
case (NotificationTweetEnrichment, Item.NotificationInfo(info)) =>
info.content match {
case NotificationContent.TweetNotification(content) =>
Some(EnrichmentKey(EnrichmentIdType.TweetId, content.tweetId))
case NotificationContent.MultiTweetNotification(content) =>
// we scarify on cache performance in this case since only a small % of
// notification content will be multi-tweet types.
Some(EnrichmentKey(EnrichmentIdType.TweetId, content.tweetIds.head))
case _ => NullKey
}
case _ => NullKey
}
}
}

View File

@ -0,0 +1,12 @@
package com.twitter.unified_user_actions.enricher.partitioner
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
trait Partitioner {
def repartition(
instruction: EnrichmentInstruction,
envelop: EnrichmentEnvelop
): Option[EnrichmentKey]
}

View File

@ -0,0 +1,16 @@
create_thrift_libraries(
org = "com.twitter.unified_user_actions.enricher",
base_name = "internal",
sources = ["*.thrift"],
tags = ["bazel-compatible"],
dependency_roots = [
"src/thrift/com/twitter/clientapp/gen:clientapp",
"unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions",
],
generate_languages = [
"java",
"scala",
],
provides_java_name = "enricher_internal-thrift-java",
provides_scala_name = "enricher_internal-thrift-scala",
)

View File

@ -0,0 +1,26 @@
namespace java com.twitter.unified_user_actions.enricher.internal.thriftjava
#@namespace scala com.twitter.unified_user_actions.enricher.internal.thriftscala
#@namespace strato com.twitter.unified_user_actions.enricher.internal
include "com/twitter/unified_user_actions/unified_user_actions.thrift"
include "enrichment_plan.thrift"
struct EnrichmentEnvelop {
/**
* An internal ID that uniquely identifies this event created during the early stages of enrichment.
* It is useful for detecting debugging, tracing & profiling the events throughout the process.
**/
1: required i64 envelopId
/**
* The UUA event to be enriched / currently being enriched / has been enriched depending on the
* stages of the enrichment process.
**/
2: unified_user_actions.UnifiedUserAction uua
/**
* The current enrichment plan. It keeps track of what is currently being enriched, what still
* needs to be done so that we can bring the enrichment process to completion.
**/
3: enrichment_plan.EnrichmentPlan plan
}(persisted='true', hasPersonalData='true')

View File

@ -0,0 +1,41 @@
namespace java com.twitter.unified_user_actions.enricher.internal.thriftjava
#@namespace scala com.twitter.unified_user_actions.enricher.internal.thriftscala
#@namespace strato com.twitter.unified_user_actions.enricher.internal
/*
* Internal key used for controling UUA enrichment & caching process. It contains very minimal
* information to allow for efficient serde, fast data look-up and to drive the partioning logics.
*
* NOTE: Don't depend on it in your application.
* NOTE: This is used internally by UUA and may change at anytime. There's no guarantee for
* backward / forward-compatibility.
* NOTE: Don't add any other metadata unless it is needed for partitioning logic. Extra enrichment
* metdata can go into the envelop.
*/
struct EnrichmentKey {
/*
* The internal type of the primary ID used for partitioning UUA data.
*
* Each type should directly correspond to an entity-level ID in UUA.
* For example, TweetInfo.actionTweetId & TweetNotification.tweetId are all tweet-entity level
* and should correspond to the same primary ID type.
**/
1: required EnrichmentIdType keyType
/**
* The primary ID. This is usually a long, for other incompatible data type such as string or
* a bytes array, they can be converted into a long using their native hashCode() function.
**/
2: required i64 id
}(persisted='true', hasPersonalData='true')
/**
* The type of the primary ID. For example, tweetId on a tweet & tweetId on a notification are
* all TweetId type. Similarly, UserID of a viewer and AuthorID of a tweet are all UserID type.
*
* The type here ensures that we will partition UUA data correctly across different entity-type
* (user, tweets, notification, etc.)
**/
enum EnrichmentIdType {
TweetId = 0
}

View File

@ -0,0 +1,52 @@
namespace java com.twitter.unified_user_actions.enricher.internal.thriftjava
#@namespace scala com.twitter.unified_user_actions.enricher.internal.thriftscala
#@namespace strato com.twitter.unified_user_actions.enricher.internal
/**
* An enrichment plan. It has multiple stages for different purposes during the enrichment process.
**/
struct EnrichmentPlan {
1: required list<EnrichmentStage> stages
}(persisted='true', hasPersonalData='false')
/**
* A stage in the enrichment process with respect to the current key. Currently it can be of 2 options:
* - re-partitioning on an id of type X
* - hydrating metadata on an id of type X
*
* A stage also moves through different statues from initialized, processing until completion.
* Each stage contains one or more instructions.
**/
struct EnrichmentStage {
1: required EnrichmentStageStatus status
2: required EnrichmentStageType stageType
3: required list<EnrichmentInstruction> instructions
// The output topic for this stage. This information is not available when the stage was
// first setup, and it's only available after the driver has finished working on
// this stage.
4: optional string outputTopic
}(persisted='true', hasPersonalData='false')
/**
* The current processing status of a stage. It should either be done (completion) or not done (initialized).
* Transient statuses such as "processing" is dangerous since we can't exactly be sure that has been done.
**/
enum EnrichmentStageStatus {
Initialized = 0
Completion = 20
}
/**
* The type of processing in this stage. For example, repartioning the data or hydrating the data.
**/
enum EnrichmentStageType {
Repartition = 0
Hydration = 10
}
enum EnrichmentInstruction {
// all enrichment based on a tweet id in UUA goes here
TweetEnrichment = 0
NotificationTweetEnrichment = 10
}

View File

@ -0,0 +1,4 @@
resources(
sources = ["*.*"],
tags = ["bazel-compatible"],
)

View File

@ -0,0 +1,45 @@
<configuration>
<!-- ===================================================== -->
<!-- Console appender for local debugging and testing -->
<!-- ===================================================== -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!-- ===================================================== -->
<!-- Package Config -->
<!-- ===================================================== -->
<!-- Root Config -->
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
<!-- Per-Package Config -->
<logger name="com.twitter" level="info"/>
<logger name="com.twitter.zookeeper.client.internal" level="warn"/>
<logger name="com.twitter.zookeeper.client.internal.ClientCnxnSocket" level="error"/>
<logger name="com.twitter.logging.ScribeHandler" level="warn"/>
<logger name="com.twitter.finatra" level="info"/>
<logger name="org.apache.kafka" level="debug"/>
<logger name="org.apache.kafka.clients" level="info"/>
<logger name="org.apache.kafka.clients.NetworkClient" level="warn"/>
<logger name="org.apache.kafka.clients.consumer.internals" level="info"/>
<logger name="org.apache.kafka.common.network" level="warn" />
<logger name="org.apache.kafka.common.security.authenticator" level="info" />
<logger name="kafka.server.KafkaConfig" level="off" />
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="off" />
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="off" />
<logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="off" />
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="off" />
<logger name="org.apache.zookeeper" level="off" />
<logger name="com.google.inject" level="info"/>
<logger name="io.netty" level="info"/>
<logger name="jdk.event" level="info"/>
<logger name="javax.security" level="info"/>
</configuration>

View File

@ -0,0 +1,12 @@
scala_library(
name = "fixture",
sources = ["EnricherFixture.scala"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/org/scalatest",
"3rdparty/jvm/org/scalatestplus:junit",
"finatra/inject/inject-core/src/test/scala/com/twitter/inject",
"unified_user_actions/enricher/src/main/thrift/com/twitter/unified_user_actions/enricher/internal:internal-scala",
"unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions:unified_user_actions-scala",
],
)

View File

@ -0,0 +1,100 @@
package com.twitter.unified_user_actions.enricher
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentPlan
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStage
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStageStatus
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStageType
import com.twitter.unified_user_actions.thriftscala.ActionType
import com.twitter.unified_user_actions.thriftscala.AuthorInfo
import com.twitter.unified_user_actions.thriftscala.EventMetadata
import com.twitter.unified_user_actions.thriftscala.Item
import com.twitter.unified_user_actions.thriftscala.MultiTweetNotification
import com.twitter.unified_user_actions.thriftscala.NotificationContent
import com.twitter.unified_user_actions.thriftscala.NotificationInfo
import com.twitter.unified_user_actions.thriftscala.ProfileInfo
import com.twitter.unified_user_actions.thriftscala.SourceLineage
import com.twitter.unified_user_actions.thriftscala.TweetInfo
import com.twitter.unified_user_actions.thriftscala.TweetNotification
import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction
import com.twitter.unified_user_actions.thriftscala.UnknownNotification
import com.twitter.unified_user_actions.thriftscala.UserIdentifier
trait EnricherFixture {
val partitionedTopic = "unified_user_actions_keyed_dev"
val tweetInfoEnrichmentPlan = EnrichmentPlan(
Seq(
// first stage: to repartition on tweet id -> done
EnrichmentStage(
EnrichmentStageStatus.Completion,
EnrichmentStageType.Repartition,
Seq(EnrichmentInstruction.TweetEnrichment),
Some(partitionedTopic)
),
// next stage: to hydrate more metadata based on tweet id -> initialized
EnrichmentStage(
EnrichmentStageStatus.Initialized,
EnrichmentStageType.Hydration,
Seq(EnrichmentInstruction.TweetEnrichment)
)
))
val tweetNotificationEnrichmentPlan = EnrichmentPlan(
Seq(
// first stage: to repartition on tweet id -> done
EnrichmentStage(
EnrichmentStageStatus.Completion,
EnrichmentStageType.Repartition,
Seq(EnrichmentInstruction.NotificationTweetEnrichment),
Some(partitionedTopic)
),
// next stage: to hydrate more metadata based on tweet id -> initialized
EnrichmentStage(
EnrichmentStageStatus.Initialized,
EnrichmentStageType.Hydration,
Seq(EnrichmentInstruction.NotificationTweetEnrichment),
)
))
def mkUUATweetEvent(tweetId: Long, author: Option[AuthorInfo] = None): UnifiedUserAction = {
UnifiedUserAction(
UserIdentifier(userId = Some(1L)),
item = Item.TweetInfo(TweetInfo(actionTweetId = tweetId, actionTweetAuthorInfo = author)),
actionType = ActionType.ClientTweetReport,
eventMetadata = EventMetadata(1234L, 2345L, SourceLineage.ServerTweetypieEvents)
)
}
def mkUUATweetNotificationEvent(tweetId: Long): UnifiedUserAction = {
mkUUATweetEvent(-1L).copy(
item = Item.NotificationInfo(
NotificationInfo(
actionNotificationId = "123456",
content = NotificationContent.TweetNotification(TweetNotification(tweetId = tweetId))))
)
}
def mkUUAMultiTweetNotificationEvent(tweetIds: Long*): UnifiedUserAction = {
mkUUATweetEvent(-1L).copy(
item = Item.NotificationInfo(
NotificationInfo(
actionNotificationId = "123456",
content = NotificationContent.MultiTweetNotification(
MultiTweetNotification(tweetIds = tweetIds))))
)
}
def mkUUATweetNotificationUnknownEvent(): UnifiedUserAction = {
mkUUATweetEvent(-1L).copy(
item = Item.NotificationInfo(
NotificationInfo(
actionNotificationId = "123456",
content = NotificationContent.UnknownNotification(UnknownNotification())))
)
}
def mkUUAProfileEvent(userId: Long): UnifiedUserAction = {
val event = mkUUATweetEvent(1L)
event.copy(item = Item.ProfileInfo(ProfileInfo(userId)))
}
}

View File

@ -0,0 +1,14 @@
junit_tests(
sources = ["*.scala"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/org/scalatest",
"3rdparty/jvm/org/scalatestplus:junit",
"finatra/inject/inject-core/src/test/scala/com/twitter/inject",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/driver",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/hydrator:base",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/partitioner:base",
"unified_user_actions/enricher/src/test/scala/com/twitter/unified_user_actions/enricher:fixture",
"util/util-core:scala",
],
)

View File

@ -0,0 +1,284 @@
package com.twitter.unified_user_actions.enricher.driver
import com.twitter.inject.Test
import com.twitter.unified_user_actions.enricher.EnricherFixture
import com.twitter.unified_user_actions.enricher.hydrator.Hydrator
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentIdType
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentPlan
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStage
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStageStatus
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentStageType
import com.twitter.unified_user_actions.enricher.partitioner.Partitioner
import com.twitter.util.Await
import com.twitter.util.Future
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.should.Matchers
import scala.collection.mutable
class DriverTest extends Test with Matchers with BeforeAndAfter {
object ExecutionContext {
var executionCount = 0
}
before {
ExecutionContext.executionCount = 0
}
trait Fixtures extends EnricherFixture {
val repartitionTweet = mkStage()
val repartitionNotiTweet =
mkStage(instructions = Seq(EnrichmentInstruction.NotificationTweetEnrichment))
val hydrateTweet = mkStage(stageType = EnrichmentStageType.Hydration)
val hydrateTweetMultiInstructions = mkStage(
stageType = EnrichmentStageType.Hydration,
instructions = Seq(
EnrichmentInstruction.NotificationTweetEnrichment,
EnrichmentInstruction.TweetEnrichment,
EnrichmentInstruction.NotificationTweetEnrichment,
EnrichmentInstruction.TweetEnrichment
)
)
val hydrateNotiTweet = mkStage(
stageType = EnrichmentStageType.Hydration,
instructions = Seq(EnrichmentInstruction.NotificationTweetEnrichment))
val key1 = EnrichmentKey(EnrichmentIdType.TweetId, 123L)
val tweet1 = mkUUATweetEvent(981L)
val hydrator = new MockHydrator
val partitioner = new MockPartitioner
val outputTopic = "output"
val partitionTopic = "partition"
def complete(
enrichmentStage: EnrichmentStage,
outputTopic: Option[String] = None
): EnrichmentStage = {
enrichmentStage.copy(status = EnrichmentStageStatus.Completion, outputTopic = outputTopic)
}
def mkPlan(enrichmentStages: EnrichmentStage*): EnrichmentPlan = {
EnrichmentPlan(enrichmentStages)
}
def mkStage(
status: EnrichmentStageStatus = EnrichmentStageStatus.Initialized,
stageType: EnrichmentStageType = EnrichmentStageType.Repartition,
instructions: Seq[EnrichmentInstruction] = Seq(EnrichmentInstruction.TweetEnrichment)
): EnrichmentStage = {
EnrichmentStage(status, stageType, instructions)
}
trait ExecutionCount {
val callMap: mutable.Map[Int, (EnrichmentInstruction, EnrichmentEnvelop)] =
mutable.Map[Int, (EnrichmentInstruction, EnrichmentEnvelop)]()
def recordExecution(instruction: EnrichmentInstruction, envelop: EnrichmentEnvelop): Unit = {
ExecutionContext.executionCount = ExecutionContext.executionCount + 1
callMap.put(ExecutionContext.executionCount, (instruction, envelop))
}
}
class MockHydrator extends Hydrator with ExecutionCount {
def hydrate(
instruction: EnrichmentInstruction,
key: Option[EnrichmentKey],
envelop: EnrichmentEnvelop
): Future[EnrichmentEnvelop] = {
recordExecution(instruction, envelop)
Future(envelop.copy(envelopId = ExecutionContext.executionCount))
}
}
class MockPartitioner extends Partitioner with ExecutionCount {
def repartition(
instruction: EnrichmentInstruction,
envelop: EnrichmentEnvelop
): Option[EnrichmentKey] = {
recordExecution(instruction, envelop)
Some(EnrichmentKey(EnrichmentIdType.TweetId, ExecutionContext.executionCount))
}
}
}
test("single partitioning plan works") {
new Fixtures {
val driver = new EnrichmentDriver(Some(outputTopic), partitionTopic, hydrator, partitioner)
// given a simple plan that only repartition the input and nothing else
val plan = mkPlan(repartitionTweet)
(1L to 10).foreach(id => {
val envelop = EnrichmentEnvelop(id, tweet1, plan)
// when
val actual = Await.result(driver.execute(Some(key1), Future(envelop)))
val expectedKey = Some(key1.copy(id = id))
val expectedValue =
envelop.copy(plan = mkPlan(complete(repartitionTweet, Some(partitionTopic))))
// then the result should have a new partitioned key, with the envelop unchanged except the plan is complete
// however, the output topic is the partitionTopic (since this is only a partitioning stage)
assert((expectedKey, expectedValue) == actual)
})
}
}
test("multi-stage partitioning plan works") {
new Fixtures {
val driver = new EnrichmentDriver(Some(outputTopic), partitionTopic, hydrator, partitioner)
// given a plan that chain multiple repartition stages together
val plan = mkPlan(repartitionTweet, repartitionNotiTweet)
val envelop1 = EnrichmentEnvelop(1L, tweet1, plan)
// when 1st partitioning trip
val actual1 = Await.result(driver.execute(Some(key1), Future(envelop1)))
// then the result should have a new partitioned key, with the envelop unchanged except the
// 1st stage of the plan is complete
val expectedKey1 = key1.copy(id = 1L)
val expectedValue1 =
envelop1.copy(plan =
mkPlan(complete(repartitionTweet, Some(partitionTopic)), repartitionNotiTweet))
assert((Some(expectedKey1), expectedValue1) == actual1)
// then, we reuse the last result to exercise the logics on the driver again for the 2st trip
val actual2 = Await.result(driver.execute(Some(expectedKey1), Future(expectedValue1)))
val expectedKey2 = key1.copy(id = 2L)
val expectedValue2 =
envelop1.copy(plan = mkPlan(
complete(repartitionTweet, Some(partitionTopic)),
complete(repartitionNotiTweet, Some(partitionTopic))))
assert((Some(expectedKey2), expectedValue2) == actual2)
}
}
test("single hydration plan works") {
new Fixtures {
val driver = new EnrichmentDriver(Some(outputTopic), partitionTopic, hydrator, partitioner)
// given a simple plan that only hydrate the input and nothing else
val plan = mkPlan(hydrateTweet)
(1L to 10).foreach(id => {
val envelop = EnrichmentEnvelop(id, tweet1, plan)
// when
val actual = Await.result(driver.execute(Some(key1), Future(envelop)))
val expectedValue =
envelop.copy(envelopId = id, plan = mkPlan(complete(hydrateTweet, Some(outputTopic))))
// then the result should have the same key, with the envelop hydrated & the plan is complete
// the output topic should be the final topic since this is a hydration stage and the plan is complete
assert((Some(key1), expectedValue) == actual)
})
}
}
test("single hydration with multiple instructions plan works") {
new Fixtures {
val driver = new EnrichmentDriver(Some(outputTopic), partitionTopic, hydrator, partitioner)
// given a simple plan that only hydrate the input and nothing else
val plan = mkPlan(hydrateTweetMultiInstructions)
val envelop = EnrichmentEnvelop(0L, tweet1, plan)
// when
val actual = Await.result(driver.execute(Some(key1), Future(envelop)))
val expectedValue = envelop.copy(
envelopId = 4L, // hydrate is called 4 times for 4 instructions in 1 stage
plan = mkPlan(complete(hydrateTweetMultiInstructions, Some(outputTopic))))
// then the result should have the same key, with the envelop hydrated & the plan is complete
// the output topic should be the final topic since this is a hydration stage and the plan is complete
assert((Some(key1), expectedValue) == actual)
}
}
test("multi-stage hydration plan works") {
new Fixtures {
val driver = new EnrichmentDriver(Some(outputTopic), partitionTopic, hydrator, partitioner)
// given a plan that only hydrate twice
val plan = mkPlan(hydrateTweet, hydrateNotiTweet)
val envelop = EnrichmentEnvelop(1L, tweet1, plan)
// when
val actual = Await.result(driver.execute(Some(key1), Future(envelop)))
// then the result should have the same key, with the envelop hydrated. since there's no
// partitioning stages, the driver will just recurse until all the hydration is done,
// then output to the final topic
val expectedValue =
envelop.copy(
envelopId = 2L,
plan = mkPlan(
complete(hydrateTweet),
complete(
hydrateNotiTweet,
Some(outputTopic)
) // only the last stage has the output topic
))
assert((Some(key1), expectedValue) == actual)
}
}
test("multi-stage partition+hydration plan works") {
new Fixtures {
val driver = new EnrichmentDriver(Some(outputTopic), partitionTopic, hydrator, partitioner)
// given a plan that repartition then hydrate twice
val plan = mkPlan(repartitionTweet, hydrateTweet, repartitionNotiTweet, hydrateNotiTweet)
var curEnvelop = EnrichmentEnvelop(1L, tweet1, plan)
var curKey = key1
// stage 1, partitioning on tweet should be correct
var actual = Await.result(driver.execute(Some(curKey), Future(curEnvelop)))
var expectedKey = curKey.copy(id = 1L)
var expectedValue = curEnvelop.copy(
plan = mkPlan(
complete(repartitionTweet, Some(partitionTopic)),
hydrateTweet,
repartitionNotiTweet,
hydrateNotiTweet))
assert((Some(expectedKey), expectedValue) == actual)
curEnvelop = actual._2
curKey = actual._1.get
// stage 2-3, hydrating on tweet should be correct
// and since the next stage after hydration is a repartition, it will does so correctly
actual = Await.result(driver.execute(Some(curKey), Future(curEnvelop)))
expectedKey = curKey.copy(id = 3) // repartition is done in stage 3
expectedValue = curEnvelop.copy(
envelopId = 2L, // hydration is done in stage 2
plan = mkPlan(
complete(repartitionTweet, Some(partitionTopic)),
complete(hydrateTweet),
complete(repartitionNotiTweet, Some(partitionTopic)),
hydrateNotiTweet)
)
assert((Some(expectedKey), expectedValue) == actual)
curEnvelop = actual._2
curKey = actual._1.get
// then finally, stage 4 would output to the final topic
actual = Await.result(driver.execute(Some(curKey), Future(curEnvelop)))
expectedKey = curKey // nothing's changed in the key
expectedValue = curEnvelop.copy(
envelopId = 4L,
plan = mkPlan(
complete(repartitionTweet, Some(partitionTopic)),
complete(hydrateTweet),
complete(repartitionNotiTweet, Some(partitionTopic)),
complete(hydrateNotiTweet, Some(outputTopic))
)
)
assert((Some(expectedKey), expectedValue) == actual)
}
}
}

View File

@ -0,0 +1,14 @@
junit_tests(
sources = ["*.scala"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/org/scalatest",
"3rdparty/jvm/org/scalatestplus:junit",
"featureswitches/dynmap/src/main/scala/com/twitter/dynmap:dynmap-core",
"featureswitches/dynmap/src/main/scala/com/twitter/dynmap/json:dynmap-json",
"finatra/inject/inject-core/src/test/scala/com/twitter/inject",
"graphql/thrift/src/main/thrift/com/twitter/graphql:graphql-scala",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/graphql",
"util/util-core:scala",
],
)

View File

@ -0,0 +1,71 @@
package com.twitter.unified_user_actions.enricher.graphql
import com.twitter.dynmap.DynMap
import com.twitter.inject.Test
import com.twitter.util.Return
import com.twitter.util.Throw
import com.twitter.util.Try
import org.scalatest.matchers.should.Matchers
class GraphqlSpecs extends Test with Matchers {
trait Fixtures {
val sampleError = """
|{
| "errors": [
| {
| "message": "Some err msg!",
| "code": 366,
| "kind": "Validation",
| "name": "QueryViolationError",
| "source": "Client",
| "tracing": {
| "trace_id": "1234567890"
| }
| }
| ]
|}""".stripMargin
val sampleValidRsp =
"""
|{
| "data": {
| "tweet_result_by_rest_id": {
| "result": {
| "core": {
| "user": {
| "legacy": {
| "id_str": "12"
| }
| }
| }
| }
| }
| }
|}
|""".stripMargin
val sampleValidRspExpected = Return(
Set(("data.tweet_result_by_rest_id.result.core.user.legacy.id_str", "12")))
val sampleErrorExpected = Throw(
GraphqlRspErrors(
DynMap.from(
"errors" -> List(
Map(
"message" -> "Some err msg!",
"code" -> 366,
"kind" -> "Validation",
"name" -> "QueryViolationError",
"source" -> "Client",
"tracing" -> Map("trace_id" -> "1234567890")
)))))
def toFlattened(testStr: String): Try[Set[(String, Any)]] =
GraphqlRspParser.toDynMap(testStr).map { dm => dm.valuesFlattened.toSet }
}
test("Graphql Response Parser") {
new Fixtures {
toFlattened(sampleValidRsp) shouldBe sampleValidRspExpected
toFlattened(sampleError) shouldBe sampleErrorExpected
}
}
}

View File

@ -0,0 +1,13 @@
junit_tests(
sources = ["*.scala"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/guava",
"3rdparty/jvm/org/scalatest",
"3rdparty/jvm/org/scalatestplus:junit",
"finatra/inject/inject-core/src/test/scala:test-deps",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/hcache",
"util/util-cache-guava/src/main/scala",
"util/util-cache/src/main/scala",
],
)

View File

@ -0,0 +1,153 @@
package com.twitter.unified_user_actions.enricher.hcache
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.inject.Test
import com.twitter.util.Await
import com.twitter.util.Future
import com.twitter.util.Time
import java.util.concurrent.TimeUnit
import java.lang.{Integer => JInt}
class LocalCacheTest extends Test {
trait Fixture {
val time = Time.fromMilliseconds(123456L)
val ttl = 5
val maxSize = 10
val underlying: Cache[JInt, Future[JInt]] = CacheBuilder
.newBuilder()
.expireAfterWrite(ttl, TimeUnit.SECONDS)
.maximumSize(maxSize)
.build[JInt, Future[JInt]]()
val stats = new InMemoryStatsReceiver
val cache = new LocalCache[JInt, JInt](
underlying = underlying,
statsReceiver = stats
)
def getCounts(counterName: String*): Long = stats.counter(counterName: _*)()
}
test("simple local cache works") {
new Fixture {
Time.withTimeAt(time) { _ =>
assert(cache.size === 0)
(1 to maxSize + 1).foreach { id =>
cache.getOrElseUpdate(id)(Future.value(id))
val actual = Await.result(cache.get(id).get)
assert(actual === id)
}
assert(cache.size === maxSize)
assert(getCounts("gets") === 2 * (maxSize + 1))
assert(getCounts("hits") === maxSize + 1)
assert(getCounts("misses") === maxSize + 1)
assert(getCounts("sets", "evictions", "failed_futures") === 0)
cache.reset()
assert(cache.size === 0)
}
}
}
test("getOrElseUpdate successful futures") {
new Fixture {
Time.withTimeAt(time) { _ =>
assert(cache.size === 0)
(1 to maxSize + 1).foreach { _ =>
cache.getOrElseUpdate(1) {
Future.value(1)
}
}
assert(cache.size === 1)
assert(getCounts("gets") === maxSize + 1)
assert(getCounts("hits") === maxSize)
assert(getCounts("misses") === 1)
assert(getCounts("sets", "evictions", "failed_futures") === 0)
cache.reset()
assert(cache.size === 0)
}
}
}
test("getOrElseUpdate Failed Futures") {
new Fixture {
Time.withTimeAt(time) { _ =>
assert(cache.size === 0)
(1 to maxSize + 1).foreach { id =>
cache.getOrElseUpdate(id)(Future.exception(new IllegalArgumentException("")))
assert(cache.get(id).map {
Await.result(_)
} === None)
}
assert(cache.size === 0)
assert(getCounts("gets") === 2 * (maxSize + 1))
assert(getCounts("hits", "misses", "sets") === 0)
assert(getCounts("evictions") === maxSize + 1)
assert(getCounts("failed_futures") === maxSize + 1)
}
}
}
test("Set successful Future") {
new Fixture {
Time.withTimeAt(time) { _ =>
assert(cache.size === 0)
cache.set(1, Future.value(2))
assert(Await.result(cache.get(1).get) === 2)
assert(getCounts("gets") === 1)
assert(getCounts("hits") === 1)
assert(getCounts("misses") === 0)
assert(getCounts("sets") === 1)
assert(getCounts("evictions", "failed_futures") === 0)
}
}
}
test("Evict") {
new Fixture {
Time.withTimeAt(time) { _ =>
assert(cache.size === 0)
// need to use reference here!!!
val f1 = Future.value(int2Integer(1))
val f2 = Future.value(int2Integer(2))
cache.set(1, f2)
cache.evict(1, f1)
cache.evict(1, f2)
assert(getCounts("gets", "hits", "misses") === 0)
assert(getCounts("sets") === 1)
assert(getCounts("evictions") === 1) // not 2
assert(getCounts("failed_futures") === 0)
}
}
}
test("Set Failed Futures") {
new Fixture {
Time.withTimeAt(time) { _ =>
assert(cache.size === 0)
cache.set(1, Future.exception(new IllegalArgumentException("")))
assert(cache.size === 0)
assert(getCounts("gets", "hits", "misses", "sets") === 0)
assert(getCounts("evictions") === 1)
assert(getCounts("failed_futures") === 1)
}
}
}
}

View File

@ -0,0 +1,19 @@
junit_tests(
sources = ["*.scala"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/org/mockito:mockito-core",
"3rdparty/jvm/org/mockito:mockito-scala",
"3rdparty/jvm/org/scalatest",
"3rdparty/jvm/org/scalatestplus:junit",
"featureswitches/dynmap/src/main/scala/com/twitter/dynmap:dynmap-core",
"finatra/inject/inject-core/src/test/scala/com/twitter/inject",
"graphql/thrift/src/main/thrift/com/twitter/graphql:graphql-scala",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/graphql",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/hydrator:default",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/hydrator:noop",
"unified_user_actions/enricher/src/test/scala/com/twitter/unified_user_actions/enricher:fixture",
"unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions:unified_user_actions-scala",
],
)

View File

@ -0,0 +1,118 @@
package com.twitter.unified_user_actions.enricher.hydrator
import com.google.common.cache.CacheBuilder
import com.twitter.dynmap.DynMap
import com.twitter.graphql.thriftscala.GraphQlRequest
import com.twitter.graphql.thriftscala.GraphQlResponse
import com.twitter.graphql.thriftscala.GraphqlExecutionService
import com.twitter.inject.Test
import com.twitter.unified_user_actions.enricher.EnricherFixture
import com.twitter.unified_user_actions.enricher.FatalException
import com.twitter.unified_user_actions.enricher.hcache.LocalCache
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentIdType
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
import com.twitter.unified_user_actions.thriftscala.AuthorInfo
import com.twitter.util.Await
import com.twitter.util.Future
import org.mockito.ArgumentMatchers
import org.mockito.MockitoSugar
class DefaultHydratorTest extends Test with MockitoSugar {
trait Fixtures extends EnricherFixture {
val cache = new LocalCache[EnrichmentKey, DynMap](
underlying = CacheBuilder
.newBuilder()
.maximumSize(10)
.build[EnrichmentKey, Future[DynMap]]())
val client = mock[GraphqlExecutionService.FinagledClient]
val key = EnrichmentKey(EnrichmentIdType.TweetId, 1L)
val envelop = EnrichmentEnvelop(123L, mkUUATweetEvent(1L), tweetInfoEnrichmentPlan)
def mkGraphQLResponse(authorId: Long): GraphQlResponse =
GraphQlResponse(
Some(
s"""
|{
| "data": {
| "tweet_result_by_rest_id": {
| "result": {
| "core": {
| "user": {
| "legacy": {
| "id_str": "$authorId"
| }
| }
| }
| }
| }
| }
|}
|""".stripMargin
))
}
test("non-fatal errors should proceed as normal") {
new Fixtures {
val hydrator = new DefaultHydrator(cache, client)
// when graphql client encounter any exception
when(client.graphql(ArgumentMatchers.any[GraphQlRequest]))
.thenReturn(Future.exception(new IllegalStateException("any exception")))
val actual =
Await.result(hydrator.hydrate(EnrichmentInstruction.TweetEnrichment, Some(key), envelop))
// then the original envelop is expected
assert(envelop == actual)
}
}
test("fatal errors should return a future exception") {
new Fixtures {
val hydrator = new DefaultHydrator(cache, client)
// when graphql client encounter a fatal exception
when(client.graphql(ArgumentMatchers.any[GraphQlRequest]))
.thenReturn(Future.exception(new FatalException("fatal exception") {}))
val actual = hydrator.hydrate(EnrichmentInstruction.TweetEnrichment, Some(key), envelop)
// then a failed future is expected
assertFailedFuture[FatalException](actual)
}
}
test("author_id should be hydrated from graphql respond") {
new Fixtures {
val hydrator = new DefaultHydrator(cache, client)
when(client.graphql(ArgumentMatchers.any[GraphQlRequest]))
.thenReturn(Future.value(mkGraphQLResponse(888L)))
val actual = hydrator.hydrate(EnrichmentInstruction.TweetEnrichment, Some(key), envelop)
assertFutureValue(
actual,
envelop.copy(uua = mkUUATweetEvent(1L, Some(AuthorInfo(Some(888L))))))
}
}
test("when AuthorInfo is populated, there should be no hydration") {
new Fixtures {
val hydrator = new DefaultHydrator(cache, client)
when(client.graphql(ArgumentMatchers.any[GraphQlRequest]))
.thenReturn(Future.value(mkGraphQLResponse(333L)))
val expected = envelop.copy(uua =
mkUUATweetEvent(tweetId = 3L, author = Some(AuthorInfo(authorId = Some(222)))))
val actual = hydrator.hydrate(EnrichmentInstruction.TweetEnrichment, Some(key), expected)
assertFutureValue(actual, expected)
}
}
}

View File

@ -0,0 +1,12 @@
package com.twitter.unified_user_actions.enricher.hydrator
import com.twitter.inject.Test
import com.twitter.unified_user_actions.enricher.ImplementationException
class NoopHydratorTest extends Test {
test("noop hydrator should throw an error when used") {
assertThrows[ImplementationException] {
new NoopHydrator().hydrate(null, null, null)
}
}
}

View File

@ -0,0 +1,13 @@
junit_tests(
sources = ["*.scala"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/org/scalatest",
"3rdparty/jvm/org/scalatestplus:junit",
"finatra/inject/inject-core/src/test/scala/com/twitter/inject",
"unified_user_actions/enricher/src/main/scala/com/twitter/unified_user_actions/enricher/partitioner:default",
"unified_user_actions/enricher/src/main/thrift/com/twitter/unified_user_actions/enricher/internal:internal-scala",
"unified_user_actions/enricher/src/test/scala/com/twitter/unified_user_actions/enricher:fixture",
"unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions:unified_user_actions-scala",
],
)

View File

@ -0,0 +1,83 @@
package com.twitter.unified_user_actions.enricher.partitioner
import com.twitter.inject.Test
import com.twitter.unified_user_actions.enricher.EnricherFixture
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentEnvelop
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentIdType
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction.NotificationTweetEnrichment
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentInstruction.TweetEnrichment
import com.twitter.unified_user_actions.enricher.internal.thriftscala.EnrichmentKey
import com.twitter.unified_user_actions.enricher.partitioner.DefaultPartitioner.NullKey
import org.scalatest.prop.TableDrivenPropertyChecks
class DefaultPartitionerTest extends Test with TableDrivenPropertyChecks {
test("default partitioner should work") {
new EnricherFixture {
val partitioner = new DefaultPartitioner
val instructions = Table(
("instruction", "envelop", "expected"),
// tweet info
(
TweetEnrichment,
EnrichmentEnvelop(1L, mkUUATweetEvent(123L), tweetInfoEnrichmentPlan),
Some(EnrichmentKey(EnrichmentIdType.TweetId, 123L))),
// notification tweet info
(
NotificationTweetEnrichment,
EnrichmentEnvelop(2L, mkUUATweetNotificationEvent(234L), tweetNotificationEnrichmentPlan),
Some(EnrichmentKey(EnrichmentIdType.TweetId, 234L))),
// notification with multiple tweet info
(
NotificationTweetEnrichment,
EnrichmentEnvelop(
3L,
mkUUAMultiTweetNotificationEvent(22L, 33L),
tweetNotificationEnrichmentPlan),
Some(EnrichmentKey(EnrichmentIdType.TweetId, 22L))
) // only the first tweet id is partitioned
)
forEvery(instructions) {
(
instruction: EnrichmentInstruction,
envelop: EnrichmentEnvelop,
expected: Some[EnrichmentKey]
) =>
val actual = partitioner.repartition(instruction, envelop)
assert(expected === actual)
}
}
}
test("unsupported events shouldn't be partitioned") {
new EnricherFixture {
val partitioner = new DefaultPartitioner
val instructions = Table(
("instruction", "envelop", "expected"),
// profile uua event
(
TweetEnrichment,
EnrichmentEnvelop(1L, mkUUAProfileEvent(111L), tweetInfoEnrichmentPlan),
NullKey),
// unknown notification (not a tweet)
(
NotificationTweetEnrichment,
EnrichmentEnvelop(1L, mkUUATweetNotificationUnknownEvent(), tweetInfoEnrichmentPlan),
NullKey),
)
forEvery(instructions) {
(
instruction: EnrichmentInstruction,
envelop: EnrichmentEnvelop,
expected: Option[EnrichmentKey]
) =>
val actual = partitioner.repartition(instruction, envelop)
assert(expected === actual)
}
}
}
}