Quantcast

[DISCUSS] Data loading improvement

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[DISCUSS] Data loading improvement

Jacky Li
I want to propose following improvement on data loading process:

Currently different steps are using different data layout in CarbonRow, and it convert back and forth in different steps. It is not easy for developer to understand the data structure used in each steps and it increase the memory requirement as it is doing unnecessary data copying in some steps. So, suggest to improve it as following
   
   1) input step: read input and create a CarbonRow with all fields are string type

   2) convert step: convert string to byte[] according to its data type, this step has compression effect of the input row so it is good for saving memory and it also take cares of the null value
        if it is dictionary dimension then convert to surrogate key;
        if it is no dictionary then convert to byte[] representation;
        if it is complex dimension, then convert to byte[] representation;
        if it is measure then convert to Object, like Integer, Long, Double, according to schema  —> change to byte[] instead of storing Object to avoid boxing/unboxing and save memory

        The conversion is happened by updating the field in CarbonRow in place, there should be no new CarbonRow created, however, there is a copy operation of the input CarbonRow, for bad record handling  —>  do not copy the row, convert it back to value if it is bad record

   3) sort step:
      improvement 1: sort the collected input CarbonRow. Currently this is done by copying the row object into internal buffer and sort is done on the buffer. —> to avoid the copying of the CarbonRow, we should create this buffer (with RowID) in input step, and only output the sorted RowID (by swapping its value in the RowID array) according to its value. If it is a merge sort, then write to file based on this sorted RowID array when spilling to disk. So no copy of CarbonRow is required.

     improvement 2: when spilling to disk, currently it changes the field order in CarbonRow, it is writing as a 3-elements array, [global dictionary dimension, plain dimension and complex dimension, measure columns] , this is because the merger is expecting it like this —> I think this is unnecessary, we can add serialization/deserialization capability in CarbonRow and use CarbonRow instead. In the case of no-sort table, it also avoid this conversion in write step.

   4) write step:
      currently it will collect one page of data (32K rows) and start a Producer which actually is the encode process of one page. In order to support parallel processing, after the page data is encoded then put it to a queue which will be taken by the Consumer, the Consumer will collect pages up to one blocklet size (configurable, say 64MB), and write to CarbonData files.
     
      improvement 1: there is an unnecessary data copy and re-ordering of the fields of the row. it converts the row to: [measure columns, plain dimension and complex dimension,  global dictionary dimension] it is different from what sort step outputs. —> so suggest to use CarbonRow only. no new row object should be created here.

      improvement 2: there are multiple traversal of the page data in the code currently —> we should change to, firstly convert the CarbonRow to ColumnarPage which is the columnar representation for all columns in one page, and collect the start/end key and statistics when doing this columnar conversion. Then apply inverted index, RLE, compression process based on ColumnarPage  object.

      improvement 3: we want to open up some interfaces for letting developer to add more page encoding, statistics, page compression. These interface will be like callbacks, so developer can write new encoding/statistics/compression method and carbon loading process will invoke it in this step. This interface will be like:

/**
 *  Codec for a column page, implementation should not keep state across pages,
 *  caller will use the same object to encode multiple pages
 */
interface PageCodec {
  /** Codec name will be stored in BlockletHeader (DataChunk3) */
  String getName();
  void startPage(int pageID);
  void processColumn(ColumnBatch batch);
  byte[] endPage(int pageID);
  ColumnBatch decode(byte[] encoded);
}

/** Compressor of the page data, the flow is encode->compress, and decompress->decode */
interface PageCompressor {
  /** Codec name will be stored in BlockletHeader (DataChunk3) */
  String getName();
  byte[] compress(byte[] encodedData);
  byte[] decompress(byte[] data);
}

/** Calculate the statistics for a column page and blocklet */
interface Statistics {
  /** Codec name will be stored in BlockletHeader (DataChunk3) */
  String getName();
  void startPage(int pageID);
  void endPage(int pageID);
  void startBlocklet(int blockletID);
  void endBlocklet(int blockletID);
 
  /** Update the stats for the input batch */
  void update(ColumnBatch batch);

  /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) */
  int[] getPageStatistisc();

  /** Output will be written to Footer */
  int[] getBlockletStatistics();
}

And, there should be a partition step adding somewhere to support partition feature (CARBONDATA-910), and it depends on whether we implement this partition shuffling in spark layer or carbon layer. (before input step or after conversion step). What is the current idea of this? @CaiQiang @Lionel

What you think about these improvements?

Regards,
Jacky




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [DISCUSS] Data loading improvement

Liang Chen
Administrator
Jacky, thank you list these constructive improvements of data loading.

Agree to consider all these improvement points, only the below one i have
some concerns.
Before considering open interfaces for data loading, we need to more
clearly define block/blocklet/page which play what different roles, then we
could consider some interfaces for block level, some interfaces for
blocklet level, some for page level.

Let us take up these improvements in the coming release.
--------------------------------------------------------------------------------------------------------------------
 improvement 3: we want to open up some interfaces for letting developer to
add more page encoding, statistics, page compression. These interface will
be like callbacks, so developer can write new encoding/statistics/compression
method and carbon loading process will invoke it in this step. This
interface will be like:

Regards
Liang

2017-04-21 14:50 GMT+08:00 Jacky Li <[hidden email]>:

> I want to propose following improvement on data loading process:
>
> Currently different steps are using different data layout in CarbonRow,
> and it convert back and forth in different steps. It is not easy for
> developer to understand the data structure used in each steps and it
> increase the memory requirement as it is doing unnecessary data copying in
> some steps. So, suggest to improve it as following
>
>    1) input step: read input and create a CarbonRow with all fields are
> string type
>
>    2) convert step: convert string to byte[] according to its data type,
> this step has compression effect of the input row so it is good for saving
> memory and it also take cares of the null value
>         if it is dictionary dimension then convert to surrogate key;
>         if it is no dictionary then convert to byte[] representation;
>         if it is complex dimension, then convert to byte[] representation;
>         if it is measure then convert to Object, like Integer, Long,
> Double, according to schema  —> change to byte[] instead of storing Object
> to avoid boxing/unboxing and save memory
>
>         The conversion is happened by updating the field in CarbonRow in
> place, there should be no new CarbonRow created, however, there is a copy
> operation of the input CarbonRow, for bad record handling  —>  do not copy
> the row, convert it back to value if it is bad record
>
>    3) sort step:
>       improvement 1: sort the collected input CarbonRow. Currently this is
> done by copying the row object into internal buffer and sort is done on the
> buffer. —> to avoid the copying of the CarbonRow, we should create this
> buffer (with RowID) in input step, and only output the sorted RowID (by
> swapping its value in the RowID array) according to its value. If it is a
> merge sort, then write to file based on this sorted RowID array when
> spilling to disk. So no copy of CarbonRow is required.
>
>      improvement 2: when spilling to disk, currently it changes the field
> order in CarbonRow, it is writing as a 3-elements array, [global dictionary
> dimension, plain dimension and complex dimension, measure columns] , this
> is because the merger is expecting it like this —> I think this is
> unnecessary, we can add serialization/deserialization capability in
> CarbonRow and use CarbonRow instead. In the case of no-sort table, it also
> avoid this conversion in write step.
>
>    4) write step:
>       currently it will collect one page of data (32K rows) and start a
> Producer which actually is the encode process of one page. In order to
> support parallel processing, after the page data is encoded then put it to
> a queue which will be taken by the Consumer, the Consumer will collect
> pages up to one blocklet size (configurable, say 64MB), and write to
> CarbonData files.
>
>       improvement 1: there is an unnecessary data copy and re-ordering of
> the fields of the row. it converts the row to: [measure columns, plain
> dimension and complex dimension,  global dictionary dimension] it is
> different from what sort step outputs. —> so suggest to use CarbonRow only.
> no new row object should be created here.
>
>       improvement 2: there are multiple traversal of the page data in the
> code currently —> we should change to, firstly convert the CarbonRow to
> ColumnarPage which is the columnar representation for all columns in one
> page, and collect the start/end key and statistics when doing this columnar
> conversion. Then apply inverted index, RLE, compression process based on
> ColumnarPage  object.
>
>       improvement 3: we want to open up some interfaces for letting
> developer to add more page encoding, statistics, page compression. These
> interface will be like callbacks, so developer can write new
> encoding/statistics/compression method and carbon loading process will
> invoke it in this step. This interface will be like:
>
> /**
>  *  Codec for a column page, implementation should not keep state across
> pages,
>  *  caller will use the same object to encode multiple pages
>  */
> interface PageCodec {
>   /** Codec name will be stored in BlockletHeader (DataChunk3) */
>   String getName();
>   void startPage(int pageID);
>   void processColumn(ColumnBatch batch);
>   byte[] endPage(int pageID);
>   ColumnBatch decode(byte[] encoded);
> }
>
> /** Compressor of the page data, the flow is encode->compress, and
> decompress->decode */
> interface PageCompressor {
>   /** Codec name will be stored in BlockletHeader (DataChunk3) */
>   String getName();
>   byte[] compress(byte[] encodedData);
>   byte[] decompress(byte[] data);
> }
>
> /** Calculate the statistics for a column page and blocklet */
> interface Statistics {
>   /** Codec name will be stored in BlockletHeader (DataChunk3) */
>   String getName();
>   void startPage(int pageID);
>   void endPage(int pageID);
>   void startBlocklet(int blockletID);
>   void endBlocklet(int blockletID);
>
>   /** Update the stats for the input batch */
>   void update(ColumnBatch batch);
>
>   /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) */
>   int[] getPageStatistisc();
>
>   /** Output will be written to Footer */
>   int[] getBlockletStatistics();
> }
>
> And, there should be a partition step adding somewhere to support
> partition feature (CARBONDATA-910), and it depends on whether we implement
> this partition shuffling in spark layer or carbon layer. (before input step
> or after conversion step). What is the current idea of this? @CaiQiang
> @Lionel
>
> What you think about these improvements?
>
> Regards,
> Jacky
>
>
>
>
>


--
Regards
Liang
Loading...