Bitmap-Based Data Processing in MaxCompute

Image for post
Image for post

By Qu Ning.

Bitmap is a technology commonly used by data developers to encode and compress user data. With the rapid processing speeds of AND, OR, and NOT operations of bitmaps, developers can filter user by such user information as profile tags and analyze weekly activity.

This article has an example that illustrates how you can encode and compute bitmaps of active user IDs from different dates using the MapReduce module of MaxCompute. We hope that this example can be helpful to you or to any other developer.

Consider the code example below:

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;
public class bitmapDemo2
{
public static class BitMapper extends MapperBase { Record key;
Record value;
@Override
public void setup(TaskContext context) throws IOException {
key = context.createMapOutputKeyRecord();
value = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException
{
RoaringBitmap mrb=new RoaringBitmap();
long AID=0;
{
{
{
{
AID=record.getBigint("id");
mrb.add((int) AID);
//获取key
key.set(new Object[] {record.getString("active_date")});
}
}
}
}
ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
mrb.serialize(new DataOutputStream(new OutputStream(){
ByteBuffer mBB;
OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
public void close() {}
public void flush() {}
public void write(int b) {
mBB.put((byte) b);}
public void write(byte[] b) {mBB.put(b);}
public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
}.init(outbb)));
String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
value.set(new Object[] {serializedstring});
context.write(key, value);
}
}
public static class BitReducer extends ReducerBase {
private Record result = null;
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
long fcount = 0;
RoaringBitmap rbm=new RoaringBitmap();
while (values.hasNext())
{
Record val = values.next();
ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
RoaringBitmap p= new RoaringBitmap(irb);
rbm.or(p);
}
ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
rbm.serialize(new DataOutputStream(new OutputStream(){
ByteBuffer mBB;
OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
public void close() {}
public void flush() {}
public void write(int b) {
mBB.put((byte) b);}
public void write(byte[] b) {mBB.put(b);}
public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
}.init(outbb)));
String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
result.set(0, key.get(0));
result.set(1, serializedstring);
context.write(result);
}
}
public static void main( String[] args ) throws OdpsException
{
System.out.println("begin.........");
JobConf job = new JobConf();

job.setMapperClass(BitMapper.class);
job.setReducerClass(BitReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));
InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
// +------------+-------------+
// | id | active_date |
// +------------+-------------+
// | 1 | 20190729 |
// | 2 | 20190729 |
// | 3 | 20190730 |
// | 4 | 20190801 |
// | 5 | 20190801 |
// +------------+-------------+
OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
// +-------------+------------+
// | active_date | bit_map |
// +-------------+------------+
// 20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
// 20190730,OjAAAAEAAAAAAAAAEAAAAAMA
// 20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D
JobClient.runJob(job);
}
}

Now lets talk about this code. After packaging Java applications and uploading the package to a MaxCompute project, developers can call this MapReduce job, the one given above, in MaxCompute. For data in the input table, user IDs are encoded by using the date as the key, and an OR operation is performed on the bitmap-encoded user IDs of the same date. Alternatively, an AND operation can be performed as required, for example, in retention cases. Then, processed data is written to the target structural table for further processing.

Original Source

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store