1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical
21 
22 import java.util
23 import org.apache.kafka.clients.producer.Partitioner
24 import org.apache.kafka.common.Cluster
25 import org.make.api.technical.crm.SendMessages
26 import org.make.core.Sharded
27 
28 import scala.jdk.CollectionConverters._
29 
30 class MakePartitioner extends Partitioner {
31 
32   override def partition(
33     topic: String,
34     key: scala.Any,
35     keyBytes: Array[Byte],
36     value: scala.Any,
37     valueBytes: Array[Byte],
38     cluster: Cluster
39   ): Int = {
40 
41     val partitions = cluster.availablePartitionsForTopic(topic).asScala.map(_.partition()).sorted
42 
43     val obj = (Option(key), Option(value)) match {
44       case (_, Some(v: Sharded))      => v.id
45       case (_, Some(v: SendMessages)) => v.id
46       case (Some(k), _)               => k
47       case (_, Some(other))           => other
48       case _                          => "should not happen"
49     }
50 
51     partitions((obj.hashCode % partitions.size + partitions.size) % partitions.size)
52 
53   }
54 
55   override def close(): Unit = {}
56 
57   override def configure(configs: util.Map[String, _]): Unit = {}
58 }
Line Stmt Id Pos Tree Symbol Tests Code
41 6719 1248 - 1290 Apply org.apache.kafka.common.Cluster.availablePartitionsForTopic cluster.availablePartitionsForTopic(topic)
41 6459 1248 - 1324 ApplyToImplicitArgs scala.collection.SeqOps.sorted scala.jdk.CollectionConverters.ListHasAsScala[org.apache.kafka.common.PartitionInfo](cluster.availablePartitionsForTopic(topic)).asScala.map[Int](((x$1: org.apache.kafka.common.PartitionInfo) => x$1.partition())).sorted[Int](math.this.Ordering.Int)
41 5836 1303 - 1316 Apply org.apache.kafka.common.PartitionInfo.partition x$1.partition()
41 7229 1318 - 1318 Select scala.math.Ordering.Int math.this.Ordering.Int
44 7819 1418 - 1422 Select org.make.core.Sharded.id v.id
45 7399 1464 - 1468 Select org.make.api.technical.crm.SendMessages.id v.id
48 6525 1600 - 1619 Literal <nosymbol> "should not happen"
51 5825 1642 - 1710 Apply scala.Int.% obj.hashCode().%(partitions.size).+(partitions.size).%(partitions.size)
51 5714 1658 - 1673 Select scala.collection.SeqOps.size partitions.size
51 6674 1695 - 1710 Select scala.collection.SeqOps.size partitions.size
51 7091 1676 - 1691 Select scala.collection.SeqOps.size partitions.size
51 7231 1631 - 1711 Apply scala.collection.SeqOps.apply partitions.apply(obj.hashCode().%(partitions.size).+(partitions.size).%(partitions.size))
55 6348 1749 - 1751 Literal <nosymbol> ()
57 7785 1816 - 1818 Literal <nosymbol> ()