I am running a sample pipeline to test what’s diff between StreamSerializer and Portable, found out the StreamSerializer is not much different than Portable, or say: at quite times it is even slower.

Checked with Jet’s team, saying the StreamSerializer will make more sense when getting along with protobuf such high performance library. So tried again, use protobuf in StreamSerializer, the result is amazing!

Pipeline Cost

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# test is running locally, 1 jet node
# the measurement is the pipeline completion time
# with StreamSerializer
time cost is 41503ms // just start jet
time cost is 34506ms // submit job again, run 2nd time
time cost is 29617ms // submit job again, run 3rd time
time cost is 32847ms // submit job again, run 4th times
time cost is 37604ms // restart jet, run again
time cost is 31349ms // submit job again
time cost is 32874ms // submit job again
time cost is 32874ms // restart jet, run again
time cost is 39657ms // restart jet, run again
# with Portable
time cost is 33360ms // just start jet
time cost is 31187ms // submit job again
time cost is 34705ms // restart jet
time cost is 30896ms // submit job again
time cost is 31170ms // submit job again
time cost is 32517ms // restart jet
time cost is 30114ms // submit job again
# with Protobuf
time cost is 8995ms // cold start
time cost is 7715ms
time cost is 7398ms

with StreamSerialzier, I noticed when I put object into the map, it is also touching read function. I am only expecting it will touch write function, the reason is the put function has to return the old value if there is, so will trigger the read fn, can try to use Set<K,V> fn.

ref. the pipeline is like below

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
pipeline.readFrom(Sources.remoteMap<String, Car>(
Car, // there are 10k cars
ClusterUtils.getCacheClientConfig()
))
.flatMap { (key, car) ->
val bomb = MutableList<Tuple2<Client, Car>>(1_000) {
Tuple2.tuple2(car.client, car)
}
Traversers.traverseIterable(
bomb
)

}
.map { (client, car) ->
// pretend to have some other process
car
}
.writeTo(Sinks.remoteMap(
WareHouse,
ClusterUtils.getCacheClientConfig(),
{it.getIdentity()}
) {
it
})

Serialization Cost

Below log shows the time cost about putting different volume data into IMDB:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
## 10k items
21Apr15 13:51:24.998 INFO [main] mock-orderA:58 - Portable: use putAll() to save 10000 items in one go, time cost: 296_524_370ns
Portable Total: create + put 10000 items, cost 1_160_117_414ns
21Apr15 13:51:25.187 INFO [main] mock-OrderSourceB:72 - Protobuf: use putAll() to save 10000 items in one go, time cost: 151_668_284ns
Protobuf Total: create + put 10000 items, cost 185_742_013ns
Protobuf Total/Portabl Total: 16.010621921411825%

## 100k items
21Apr15 13:53:10.267 INFO [main] mock-orderA:58 - Portable: use putAll() to save 100000 items in one go, time cost: 1_394_450_550ns
Portable Total: create + put 100000 items, cost 2_567_479_807ns
21Apr15 13:53:10.993 INFO [main] mock-OrderSourceB:72 - Protobuf: use putAll() to save 100000 items in one go, time cost: 612_507_310ns
Protobuf Total: create + put 100000 items, cost 722_944_795ns
Protobuf Total/Portabl Total: 28.157759723327008%

### 1mn items
21Apr15 13:54:06.720 INFO [main] mock-orderA:58 - Portable: use putAll() to save 1000000 items in one go, time cost: 8_669_311_873ns
Portable Total: create + put 1000000 items, cost 11_535_176_693ns
21Apr15 13:54:13.361 INFO [main] mock-OrderSourceB:72 - Protobuf: use putAll() to save 1000000 items in one go, time cost: 5_935_409_048ns
Protobuf Total: create + put 1000000 items, cost 6_636_865_991ns
Protobuf Total/Portabl Total: 57.53588495118165%

Ref