Customize¶
Plugin dispatcher¶
Specify the dispatcher to be used inside the plugin. The default is “akka.actor.default-dispatcher”.
Journal¶
j5ik2o.dynamo-db-journal {
plugin-dispatcher = "akka.actor.default-dispatcher"
}
Snapshot¶
j5ik2o.dynamo-db-snapshot {
plugin-dispatcher = "akka.actor.default-dispatcher"
}
State¶
j5ik2o.dynamo-db-state {
plugin-dispatcher = "akka.actor.default-dispatcher"
}
Table names¶
Specify the table name. The default is the followings.
Journal¶
j5ik2o.dynamo-db-journal {
table-name = "Journal"
}
Snapshot¶
j5ik2o.dynamo-db-snapshot {
table-name = "Snapshot"
}
State¶
j5ik2o.dynamo-db-state {
table-name = "State"
}
Table column names¶
Journal¶
j5ik2o.dynamo-db-journal {
columns-def {
partition-key-column-name = "pkey"
sort-key-column-name = "skey"
persistence-id-column-name = "persistence-id"
sequence-nr-column-name = "sequence-nr"
deleted-column-name = "deleted"
message-column-name = "message"
ordering-column-name = "ordering"
tags-column-name = "tags"
}
}
Snapshot¶
j5ik2o.dynamo-db-snapshot {
columns-def {
partition-key-column-name = "pkey"
sort-key-column-name = "skey"
persistence-id-column-name = "persistence-id"
sequence-nr-column-name = "sequence-nr"
snapshot-column-name = "snapshot"
created-column-name = "created"
}
}
State¶
j5ik2o.dynamo-db-state {
columns-def {
partition-key-column-name = "pkey"
sort-key-column-name = "skey"
persistence-id-column-name = "persistence-id"
sequence-nr-column-name = "sequence-nr"
deleted-column-name = "deleted"
payload-column-name = "payload"
serializer-id-column-name = "serializer-id"
serializer-manifest-column-name = "serializer-manifest"
ordering-column-name = "ordering"
tags-column-name = "tags"
}
}
Index names¶
Journal¶
j5ik2o.dynamo-db-journal {
get-journal-rows-index-name = "GetJournalRowsIndex"
}
Snapshot¶
j5ik2o.dynamo-db-snapshot {
get-snapshot-rows-index-name = "GetSnapshotRowsIndex"
}
State¶
The state plugin has not the secondary index.
Write-sharding¶
Journal¶
j5ik2o.dynamo-db-journal {
shard-count = 64
partition-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver$PersistenceIdBased"
sort-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$PersistenceIdWithSeqNr"
}
Partition Key Resolver Class Name¶
partition-key-resolver-class-name
specifies the implementation class that generates pkey
from PersistenceId
and Sequence Number
.
The following two implementations are available for built-in use.
com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.SequenceNumberBased
(Default)pkey = ${persistenceId}-${sequenceNumber % shardCount}
The same
PersistenceId
will be assigned to a different shard if theSequence Number
is different. This is a write-specific sharding.
com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.PersistenceIdBased
pkey = ${persistenceId.prefix}-${md5(persistenceId.reverse) % shardCount}
e.g.
counter-875e6ce0425e4d2b8203f3b44b9b531a
,persistenceId.prefix
iscounter
.If you choose this option, the same shard will be assigned if the
PersistenceId
is the same, so be sure to select this option if you are using DynamoDB Streams or KDS for DynamoDB.
Sort Key Resolver Class Name¶
sort-key-resolver-class-name
specifies the implementation class that generates skey
from PersistenceId
and Sequence Number
.
The following two implementations are available for built-in use.
com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$SeqNr
skey = $sequenceNumber
An implementation in which
pkey
is theSequence Number
.
com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$PersistenceIdWithSeqNr
skey = ${persistenceId.body}-${sequenceNumber}
e.g.
875e6ce0425e4d2b8203f3b44b9b531a
,persistenceId.body
is875e6ce0425e4d2b8203f3b44b9b531a
.Use
persistenceId.body
as the prefix sinceshard-count
may cause multiplepersistenceId
s events to be stored in the same shard.
Configure your own implementation¶
If you need, set up your own implementation.
j5ik2o.dynamo-db-journal {
partition-key-resolver-class-name = "your class name(fqcn)"
sort-key-resolver-class-name = "your class name(fqcn)"
}
Snapshot¶
j5ik2o.dynamo-db-snapshot {
shard-count = 64
partition-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.snapshot.PartitionKeyResolver$PersistenceIdBased"
sort-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.snapshot.SortKeyResolver$PersistenceIdWithSeqNr"
}
Shard Count¶
shard-count
is the logical number of shards. default value is 64
.
Partition Key Resolver Class Name¶
partition-key-resolver-class-name
specifies the implementation class that generates pkey
from PersistenceId
and Sequence Number
.
The following two implementations are available for built-in use.
com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.SequenceNumberBased
(Default)pkey =${persistenceId}-${sequenceNumber % shardCount}
The same
PersistenceId
will be assigned to a different shard if theSequence Number
is different. This is a write-specific sharding.
com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.PersistenceIdBased
pkey = ${persistenceId.prefix}-${md5(persistenceId.reverse) % shardCount}
e.g.
counter-875e6ce0425e4d2b8203f3b44b9b531a
,persistenceId.prefix
iscounter
.If you choose this option, the same shard will be assigned if the
PersistenceId
is the same, so be sure to select this option if you are using DynamoDB Streams or KDS for DynamoDB.
Sort Key Resolver Class Name¶
sort-key-resolver-class-name
specifies the implementation class that generates skey
from PersistenceId
and Sequence Number
.
The following two implementations are available for built-in use.
com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$SeqNr
skey = $sequenceNumber
An implementation in which
pkey
is theSequence Number
.
com.github.j5ik2o.akka.persistence.dynamodb.journal.SortKeyResolver$PersistenceIdWithSeqNr
skey = ${persistenceId.body}-${sequenceNumber}
e.g.
875e6ce0425e4d2b8203f3b44b9b531a
,persistenceId.body
is875e6ce0425e4d2b8203f3b44b9b531a
.Use
persistenceId.body
as the prefix sinceshard-count
may cause multiplepersistenceId
s events to be stored in the same shard.
Configure your own implementation¶
If you need, set up your own implementation.
j5ik2o.dynamo-db-snapshot {
partition-key-resolver-class-name = "your class name(fqcn)"
sort-key-resolver-class-name = "your class name(fqcn)"
}
Persistent data images
persistenceId |
sequence-nr |
pkey(SequenceNumberBased) |
skey(SeqNr) |
---|---|---|---|
counter-875e6ce0425e4d2b8203f3b44b9b531a |
1 |
counter-875e6ce0425e4d2b8203f3b44b9b531a-1 |
0000000000000000001 |
counter-875e6ce0425e4d2b8203f3b44b9b531a |
2 |
counter-875e6ce0425e4d2b8203f3b44b9b531a-2 |
0000000000000000002 |
counter-875e6ce0425e4d2b8203f3b44b9b531a |
3 |
counter-875e6ce0425e4d2b8203f3b44b9b531a-3 |
0000000000000000003 |
counter-875e6ce0425e4d2b8203f3b44b9b531a |
4 |
counter-875e6ce0425e4d2b8203f3b44b9b531a-4 |
0000000000000000004 |
counter-875e6ce0425e4d2b8203f3b44b9b531a |
5 |
counter-875e6ce0425e4d2b8203f3b44b9b531a-5 |
0000000000000000005 |
persistenceId |
sequence-nr |
pkey(PersistenceIdBased) |
skey(PersistenceIdWithSeqNr) |
---|---|---|---|
counter-875e6ce0425e4d2b8203f3b44b9b531a |
1 |
counter-0000000000000000000000000000000000000803 |
875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000001 |
counter-875e6ce0425e4d2b8203f3b44b9b531a |
2 |
counter-0000000000000000000000000000000000000803 |
875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000002 |
counter-875e6ce0425e4d2b8203f3b44b9b531a |
3 |
counter-0000000000000000000000000000000000000803 |
875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000003 |
counter-875e6ce0425e4d2b8203f3b44b9b531a |
4 |
counter-0000000000000000000000000000000000000803 |
875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000004 |
counter-875e6ce0425e4d2b8203f3b44b9b531a |
5 |
counter-0000000000000000000000000000000000000803 |
875e6ce0425e4d2b8203f3b44b9b531a-0000000000000000005 |
counter-a8d46579bc2f4caf8c3b8dc2db984227 |
1 |
counter-0000000000000000000000000000000000000803 |
a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000001 |
counter-a8d46579bc2f4caf8c3b8dc2db984227 |
2 |
counter-0000000000000000000000000000000000000803 |
a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000002 |
counter-a8d46579bc2f4caf8c3b8dc2db984227 |
3 |
counter-0000000000000000000000000000000000000803 |
a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000003 |
counter-a8d46579bc2f4caf8c3b8dc2db984227 |
4 |
counter-0000000000000000000000000000000000000803 |
a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000004 |
counter-a8d46579bc2f4caf8c3b8dc2db984227 |
5 |
counter-0000000000000000000000000000000000000803 |
a8d46579bc2f4caf8c3b8dc2db984227-0000000000000000005 |
State¶
The state plugin has not the sort-key and SortKeyResolver.
j5ik2o.dynamo-db-state {
shard-count = 64
partition-key-resolver-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.state.PartitionKeyResolver$PersistenceIdBased"
}
Shard Count¶
shard-count
is the logical number of shards. default value is 64
.
Partition Key Resolver Class Name¶
There are one standard implementations as follows.
com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver.PersistenceIdBased
This is the same implementation as Journal or Snapshot.
Configure your own implementation¶
If you need, set up your own implementation.
j5ik2o.dynamo-db-state {
partition-key-resolver-class-name = "your class name(fqcn)"
}
AWS Client¶
AWS Client Factory¶
The following factories are used to create AWS Clients inside the plugins.
key |
version |
async/sync |
default value |
---|---|---|---|
j5ik2o.dynamo-db-?????.v2-async-client-factory-class-name |
v2 |
async |
com.github.j5ik2o.akka.persistence.dynamodb.utils.V2AsyncClientFactory$Default |
j5ik2o.dynamo-db-?????.v2-sync-client-factory-class-name |
v2 |
sync |
com.github.j5ik2o.akka.persistence.dynamodb.utils.V2SyncClientFactory$Default |
j5ik2o.dynamo-db-?????.v2-dax-async-client-factory-class-name |
v2-dax |
async |
com.github.j5ik2o.akka.persistence.dynamodb.utils.V2DaxAsyncClientFactory$Default |
j5ik2o.dynamo-db-?????.v2-dax-sync-client-factory-class-name |
v2-dax |
sync |
com.github.j5ik2o.akka.persistence.dynamodb.utils.V2DaxSyncClientFactory$Default |
j5ik2o.dynamo-db-?????.v1-async-client-factory-class-name |
v1 |
async |
com.github.j5ik2o.akka.persistence.dynamodb.utils.V1AsyncClientFactory$Default |
j5ik2o.dynamo-db-?????.v1-sync-client-factory-class-name |
v1 |
sync |
com.github.j5ik2o.akka.persistence.dynamodb.utils.V1SyncClientFactory$Default |
j5ik2o.dynamo-db-?????.v1-dax-async-client-factory-class-name |
v1-dax |
async |
com.github.j5ik2o.akka.persistence.dynamodb.utils.V1DaxAsyncClientFactory$Default |
j5ik2o.dynamo-db-?????.v1-dax-sync-client-factory-class-name |
v1-dax |
sync |
com.github.j5ik2o.akka.persistence.dynamodb.utils.V1DaxSyncClientFactory$Default |
The AWS Clients generation process is automatic if you specify the set values, but if you wish to apply your own generation process, please do the following.
This plugin is useful for specifying the configuration items to the client that are not supported by this plugin, or for dynamically specifying configuration items within the program.
package example
import akka.actor.DynamicAccess
import com.github.j5ik2o.akka.persistence.dynamodb.config.PluginConfig
import com.github.j5ik2o.akka.persistence.dynamodb.utils.V2AsyncClientFactory
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
class MyV2AsyncClientFactory extends V2AsyncClientFactory {
override def create(
dynamicAccess: DynamicAccess,
pluginConfig: PluginConfig
): DynamoDbAsyncClient = {
// Default Implementation is the following.
// V2ClientUtils.createV2AsyncClient(dynamicAccess, pluginConfig)
// e.g. Custom AWS Client Initialization
V2ClientBuilderUtils
.setupAsync(
dynamicAccess,
pluginConfig
)
.defaultsMode(DefaultsMode.AUTO) // This is not a configuration item in the plugin.
.build()
}
}
j5ik2o.dynamo-db-journal.v2-async-client-factory-class-name = "example.MyV2AsyncClientFactory"
AWS Client Config¶
The configurations for AWS Clients is specified in the following sections(j5ik2o.dynamodb-db-?????.dynamo-db-client
).
j5ik2o.dynamo-db-journal {
dynamo-db-client {
access-key-id = "x"
secret-access-key = "x"
endpoint = "http://localhost:8000/"
}
}
access-key-id
is Access Key IDsecret-access-key
is Secret Access Keyendpoint
is an endpoint to DynamoDB
AWS CredentialsProvider¶
You can specify your own AwsCredentialsProvider
.
aws-credentials-provider-provider-class-name
can specify the provider that generate AwsCredentialsProvider
.
aws-credentials-provider-class-name
can specify the implementation class name of AwsCredentialsProvider
.
j5ik2o.dynamo-db-????? {
dynamo-db-client {
v2 {
aws-credentials-provider-provider-class-name = "com.github.j5ik2o.akka.persistence.dynamodb.client.v2.AwsCredentialsProviderProvider$Default"
aws-credentials-provider-class-name = "software.amazon.awssdk.auth.credentials.AwsCredentialsProvider"
}
}
}
Custom AWSCredentialsProviderProvider by using aws-credentials-provider-provider-class-name
¶
package example
import akka.actor.DynamicAccess
import com.github.j5ik2o.akka.persistence.dynamodb.client.v2.AwsCredentialsProviderProvider
import com.github.j5ik2o.akka.persistence.dynamodb.config.PluginConfig
import software.amazon.awssdk.auth.credentials.{ AwsCredentialsProvider, WebIdentityTokenFileCredentialsProvider }
final class MyAwsCredentialsProviderProvider(@unused dynamicAccess: DynamicAccess, @unused pluginConfig: PluginConfig)
extends AwsCredentialsProviderProvider {
override def create: Option[AwsCredentialsProvider] = {
if (sys.env.contains("AWS_ROLE_ARN"))
Some(WebIdentityTokenFileCredentialsProvider.create())
else
None
}
}
j5ik2o.dynamo-db-????? {
v2 {
aws-credentials-provider-provider-class-name = "exampe.MyAwsCredentialsProviderProvider"
}
}
Custom AWSCredentialsProvider by using aws-credentials-provider-class-name
¶
package example
import akka.actor.DynamicAccess
import software.amazon.awssdk.auth.credentials.{ AwsCredentialsProvider, AwsCredentials }
import com.github.j5ik2o.akka.persistence.dynamodb.config.PluginConfig
class MyAwsCredentialsProvider(dynamicAccess: DynamicAccess, pluginConfig: PluginConfig) extends AwsCredentialsProvider {
override def resolveCredentials(): AwsCredentials = {
// Custom AWS Credentials
}
}
Specify your custom implementation class to aws-credentials-provider-class-name
.
j5ik2o.dynamo-db-????? {
v2 {
aws-credentials-provider-class-name = "exampe.MyAwsCredentialsProvider"
}
}
MetricPublisher¶
Using the MetricPublisher in the V2 sdk, metrics at the SDK level can be sent to Datadog and Newrelic via Kamon.
package example
import akka.actor.DynamicAccess
import com.github.j5ik2o.akka.persistence.dynamodb.config.PluginConfig
import kamon.Kamon
import kamon.metric.{ Counter, Histogram, MeasurementUnit }
import software.amazon.awssdk.core.metrics.CoreMetric
import software.amazon.awssdk.metrics.{ MetricCollection, MetricPublisher }
import scala.annotation.unused
import scala.jdk.StreamConverters.StreamHasToScala
class MyMetricPublisher(
@unused dynamicAccess: DynamicAccess,
pluginConfig: PluginConfig
) extends MetricPublisher {
override def publish(metricCollection: MetricCollection): Unit = {
val metricsMap = metricCollection.stream().toScala(Vector).map { mr => (mr.metric().name(), mr) }.toMap
metricsMap(CoreMetric.OPERATION_NAME.name()).value() match {
case "PutItem" =>
if (metricsMap(CoreMetric.API_CALL_SUCCESSFUL.name()).value().asInstanceOf[java.lang.Boolean]) {
val apiCallDuration = metricsMap(CoreMetric.API_CALL_DURATION.name()).value().asInstanceOf[java.time.Duration]
val credentialsFetchDuration =
metricsMap(CoreMetric.CREDENTIALS_FETCH_DURATION.name()).value().asInstanceOf[java.time.Duration]
val marshallingDuration =
metricsMap(CoreMetric.MARSHALLING_DURATION.name()).value().asInstanceOf[java.time.Duration]
val retryCount = metricsMap(CoreMetric.RETRY_COUNT.name()).value().asInstanceOf[java.lang.Integer]
putItemApiCallDurationHistogram.record(apiCallDuration.toMillis)
putItemCredentialsFetchDurationHistogram.record(credentialsFetchDuration.toMillis)
putItemMarshallingDurationHistogram.record(marshallingDuration.toMillis)
putItemRetryCounter.increment(retryCount.toLong)
} else {
putItemErrorCounter.increment()
}
case _ =>
}
}
override def close(): Unit = {}
private val prefix = "app"
private val apiCallDurationHistogram = histogram(s"$prefix.aws.api-call-duration")
private val credentialsFetchDurationHistogram = histogram(s"$prefix.aws.credentials-fetch-duration")
private val marshallingDurationHistogram = histogram(s"$prefix.aws.marshalling-duration")
private val retryCounter = counter(s"$prefix.aws.retry-count")
private val errorCounter = counter(s"$prefix.error-count")
private val putItemApiCallDurationHistogram = apiCallDurationHistogram.withTag("Operation", "PutItem")
private val putItemCredentialsFetchDurationHistogram = credentialsFetchDurationHistogram
.withTag("Operation", "PutItem")
private val putItemMarshallingDurationHistogram = marshallingDurationHistogram
.withTag("Operation", "PutItem")
private val putItemRetryCounter = retryCounter
.withTag("Operation", "PutItem")
private val putItemErrorCounter = errorCounter
.withTag("Operation", "PutItem")
private def histogram(metricName: String): Histogram =
Kamon
.histogram(metricName, MeasurementUnit.time.milliseconds)
.withTag("TableName", pluginConfig.tableName)
.withTag("sdk", s"java-${pluginConfig.clientConfig.clientVersion}")
private def counter(metricName: String): Counter =
Kamon
.counter(metricName)
.withTag("TableName", pluginConfig.tableName)
.withTag("sdk", s"java-${pluginConfig.clientConfig.clientVersion}")
}
Specify your custom implementation classes to metric-publisher-class-names
.
j5ik2o.dynamo-db-????? {
v2 {
metric-publisher-class-names = ["exampe.MyMetricPublisher"]
}
}