1 /* 2 * Make.org Core API 3 * Copyright (C) 2018 Make.org 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 * 18 */ 19 20 package org.make.api.technical 21 22 import java.util 23 import org.apache.kafka.clients.producer.Partitioner 24 import org.apache.kafka.common.Cluster 25 import org.make.api.technical.crm.SendMessages 26 import org.make.core.Sharded 27 28 import scala.jdk.CollectionConverters._ 29 30 class MakePartitioner extends Partitioner { 31 32 override def partition( 33 topic: String, 34 key: scala.Any, 35 keyBytes: Array[Byte], 36 value: scala.Any, 37 valueBytes: Array[Byte], 38 cluster: Cluster 39 ): Int = { 40 41 val partitions = cluster.availablePartitionsForTopic(topic).asScala.map(_.partition()).sorted 42 43 val obj = (Option(key), Option(value)) match { 44 case (_, Some(v: Sharded)) => v.id 45 case (_, Some(v: SendMessages)) => v.id 46 case (Some(k), _) => k 47 case (_, Some(other)) => other 48 case _ => "should not happen" 49 } 50 51 partitions((obj.hashCode % partitions.size + partitions.size) % partitions.size) 52 53 } 54 55 override def close(): Unit = {} 56 57 override def configure(configs: util.Map[String, _]): Unit = {} 58 }
| Line | Stmt Id | Pos | Tree | Symbol | Tests | Code |
|---|---|---|---|---|---|---|
| 41 | 6719 | 1248 - 1290 | Apply | org.apache.kafka.common.Cluster.availablePartitionsForTopic | cluster.availablePartitionsForTopic(topic) | |
| 41 | 6459 | 1248 - 1324 | ApplyToImplicitArgs | scala.collection.SeqOps.sorted | scala.jdk.CollectionConverters.ListHasAsScala[org.apache.kafka.common.PartitionInfo](cluster.availablePartitionsForTopic(topic)).asScala.map[Int](((x$1: org.apache.kafka.common.PartitionInfo) => x$1.partition())).sorted[Int](math.this.Ordering.Int) | |
| 41 | 5836 | 1303 - 1316 | Apply | org.apache.kafka.common.PartitionInfo.partition | x$1.partition() | |
| 41 | 7229 | 1318 - 1318 | Select | scala.math.Ordering.Int | math.this.Ordering.Int | |
| 44 | 7819 | 1418 - 1422 | Select | org.make.core.Sharded.id | v.id | |
| 45 | 7399 | 1464 - 1468 | Select | org.make.api.technical.crm.SendMessages.id | v.id | |
| 48 | 6525 | 1600 - 1619 | Literal | <nosymbol> | "should not happen" | |
| 51 | 5825 | 1642 - 1710 | Apply | scala.Int.% | obj.hashCode().%(partitions.size).+(partitions.size).%(partitions.size) | |
| 51 | 5714 | 1658 - 1673 | Select | scala.collection.SeqOps.size | partitions.size | |
| 51 | 6674 | 1695 - 1710 | Select | scala.collection.SeqOps.size | partitions.size | |
| 51 | 7091 | 1676 - 1691 | Select | scala.collection.SeqOps.size | partitions.size | |
| 51 | 7231 | 1631 - 1711 | Apply | scala.collection.SeqOps.apply | partitions.apply(obj.hashCode().%(partitions.size).+(partitions.size).%(partitions.size)) | |
| 55 | 6348 | 1749 - 1751 | Literal | <nosymbol> | () | |
| 57 | 7785 | 1816 - 1818 | Literal | <nosymbol> | () |