This is the technique used to group data together. In this approach we identify clusters around some points known as centroids. These clusters then are known as different categories or classes. Say, we have to identify two clusters out of the data given, we will first create two centers randomly ( assign step) and then optimize these points such that they form the center points to the clusters using some optimization techniques (Optimization step). We will discuss the process in detail in this post below.
There are two stages in k-means clustering approach.
- Assign ( assign points to a centroid)
- Optimize (move centroid)
The biggest challenge with k-means is your clusters may be totally different based on initial placement of centroids. You may like to build several such clusters and then do ensembling.
Questions to ask are:
- How to select number of centroids in the data??
- What is good placement of centroid to start algorithm??
- How many iterations of assign and optimize you wants to perform??
- Given a fixed set of training data and fixed number of clusters, output may not be always same. (as it depend on the initial placement of the centroids)
- local minimum problem: (Local hidden climbing algorithm)
Single Linkage Clustering (SLC) or Hierarchical Agglomerative
- consider each object as a cluster itself
- define inter cluster distance as distance between closest two points in two clusters
- merge two closest clusters
- iterate n-k times to form k clusters ( n is total number of points)
SLC is deterministic.
Average Link Clustering Algo: take average distance between two cluster (point 2 above)
Max Link Clustering Algo: take distance from farthest two points in two clusters (point 2)
Think of the scenario, where a point lies exactly in between two cluster. So with SLC, it might end up with any of the two clusters, based on how you start your algorithm.
- How do we know if consumer has consumed a kafka message or not?
Kafka does not keep track of messages consumed. The consumer has to keep the track of the same. In case of failure of consumer, message needs to be reprocessed by consumer it self.
you can control from where in the Kafka topic the consumer begins to read by setting
KafkaConfig.startOffsetTime as follows:
kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)
kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messages that are being written to the topic)
- A Unix timestamp aka seconds since the epoch (e.g. via
- Can we delete a message from topic once consumer read it?
- No, kafka does not support delete. You can move ahead your reading offset. In case you want to skip reading this message for a group of consumer, you can put all the consumers in a consumer-group.
- Kafka Consumer offset:
- consumers commit their offsets to ZooKeeper. ZooKeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e.,
consumer-count * partition-count). Fortunately, Kafka now provides an ideal mechanism for storing consumer offsets. Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.
- Kafka Consumers request messages from a Kafka broker via a call to
poll() and their progress is tracked via offsets. Each message within each partition of each topic, has a so-called offset assigned—its logical sequence number within the partition. A
KafkaConsumer tracks its current offset for each partition that is assigned to it. Pay attention, that the Kafka brokers are not aware of the current offsets of the consumers. Thus, on
poll() the consumer needs to send its current offsets to the broker, such that the broker can return the corresponding messages, i.e,. messages with a larger consecutive offset. For example, let us assume we have a single partition topic and a single consumer with current offset 5. On
poll() the consumer sends if offset to the broker and the broker return messages for offsets 6,7,8,…
Because consumers track their offsets themselves, this information could get lost if a consumer fails. Thus, offsets must be stored reliably, such that on restart, a consumer can pick up its old offset and resumer where it left of. In Kafka, there is built-in support for this via offset commits. The new
KafkaConsumer can commit its current offset to Kafka and Kafka stores those offsets in a special topic called
__consumer_offsets. Storing the offsets within a Kafka topic is not just fault-tolerant, but allows to reassign partitions to other consumers during a rebalance, too. Because all consumers of a Consumer Group can access all committed offsets of all partitions, on rebalance, a consumer that gets a new partition assigned just reads the committed offset of this partition from the
__consumer_offsets topic and resumes where the old consumer left of.
- Consumer Group:
- A group of consumers collaborate to consume messages.
- A Consumer Group based application may run on several nodes, and when they start up they coordinate with each other in order to split up the work.
- Kafka scales topic consumption by distributing partitions among a consumer group, which is a set of consumers sharing a common group identifier.
- Each partition in the topic is assigned to exactly one member in the group.
- While the old consumer depended on Zookeeper for group management, the new consumer uses a group coordination protocol built into Kafka itself. For each group, one of the brokers is selected as the group coordinator. The coordinator is responsible for managing the state of the group. Its main job is to mediate partition assignment when new members arrive, old members depart, and when topic metadata changes. The act of reassigning partitions is known as rebalancing the group.
- The messages in each partition log are then read sequentially. As the consumer makes progress, it commits the offsets of messages it has successfully processed. When a partition gets reassigned to another consumer in the group, the initial position is set to the last committed offset.
- A consumer can subscribe to more than one partitions in different topics.
- Consumers work as part of a consumer group. This is one or more consumers that work together to consume a topic. The group assures that each partition is only consumed by one member. consumers can horizontally scale to consume topics with a large number of messages. Additionally, if a single consumer fails, the remaining members of the group will rebalance the partitions being consumed to take over for the missing member.
- Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id.
The consumers in a group divide up the partitions as fairly as possible, each partition is consumed by exactly one consumer in a consumer group.
- In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes.
- Consumers track the maximum offset they have consumed in each partition. This value is stored in a zookeeper directory
/consumers/[group_id]/offsets/[topic]/[partition_id] –> offset_counter_value ((persistent node)
- How to ensure there is no duplicate record in kafka.
- Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.
- There are two approaches to getting exactly once semantics during data production: 1. Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded 2. Include a primary key (UUID or something) in the message and deduplicate on the consumer.
- What if #of consumers are more than #partitions
- Some consumers might not get any data
- Kafka message format
Other FAQ References:
Weather we should compress our data while storing on HDFS or not is a important question. It creates a significant impact on the performance while processing data. Compressing data is advantageous in saving space as well as network transfer throughout the cluster.
Codec is the term used for compressor / decompressor. It is an implementation of compression / decompression algorithm. There are different codec algorithms which differ in both the speed in which they can compress/decompress as well as on the degree to which they can compress data.
Splittable compression means that compressed file can be splitted if exceeds block size and can be processed parallely by different mappers.
Splittable compression is only a factor for text files. For binary files, Hadoop compression codecs compress data within a binary-encoded container, depending on the file type (for example, a SequenceFile, Avro, or ProtocolBuffer).
If the input file to a MapReduce job contains compressed data, the time that is needed to read that data from HDFS is reduced and job performance is enhanced. The input data is decompressed automatically when it is being read by MapReduce.
We can use compression at any stage. For storing files to HDFS, for storing intermediate results to save network transfer time and load, and to save final output of the processing.
Following table summarizes different compression techniques:
||Degree of Compression
||when data is just for historical storage. rarely query
|| fastest compression algo.
||No, unless indexed
HDFS is the integral part of the hadoop eco system. It is used to store file. Just like the normal file system, we can store data in many different file formats into HDFS. I will write brief description of some of the common formats below.
Importance of Selecting right storage format:
- Faster read times
- Faster write times
- Splittable files (so you don’t need to read the whole file, just a part of it)
- Schema evolution support (allowing you to change the fields in a dataset)
- Advanced compression support (compress the files with a compression codec without sacrificing these features)
In MapReduce file format support is provided by the
OutputFormat classes. You can specify them in job configuration.
|Plain text storage
|Most common format.
default format for hive.
Each line is a record
If wants to use compression, use file level compression codec which support splitting like bzip2
|easy to create and understand
||Can waste lot of space for the columns which do not have data
||Originally designed for mapreduce
Encode a key and a value for each record
internally, the temporary outputs of maps are stored using SequenceFile.
Records are stored in a binary format that is smaller than a text-based format
Traditional mapreduce binary file format
Stores Keys and Values as a class
Not good for Hive ,Which has sql types
Hive always stores entire line as a value
Default block size is 1 MB
Need to read and Decompress all the fields
|One benefit of sequence files is that they support block-level compression, so you can compress the contents of the file while also maintaining the ability to split the file into segments for multiple map tasks.
Easiest next to text file.
|The problem with this is that if you add or change fields in your Writable class it will not be backwards compatible with the data stored in the sequence file.
Not good for Hive, Append only like other data formats
| RC File
(Record Columnar Files)
| First columnar file format adopted in Hadoop
Like columnar databases, the RC file enjoys significant compression and query performance benefits. However, the current serdes for RC files in Hive and other tools do not support schema evolution. In order to add a column to your data you must rewrite every pre-existing RC file. Also, although RC files are good for query, writing an RC file requires more memory and computation than non-columnar file formats. They are generally slower to write.
RCFILE first stores the metadata of a row split, as the key part of a record, and all the data of a row split as the value part. This means that RCFILE encourages column oriented storage rather than row oriented storage
||slower to write
do not support schema evolution i.e. you need to rewrite old RC files if schema changes.
(Optimized Row Columnar)Input Format
|Reduces data upto 75%
Speed to data processing is faster than text, sequence formats
An ORC file contains rows data in groups called as Stripes along with a file footer. ORC format improves the performance when Hive is processing the data. We cannot load data into ORCFILE directly. First we need to load data into another table and then we need to overwrite it into our newly created ORCFILE.
ORC files enjoy the same benefits and limitations as RC files just done better for Hadoop. This means ORC files compress better than RC files, enabling faster queries. However, they still don’t support schema evolution.
|| Apache Avro is a language-neutral data serialization system
|| Parquet Files are yet another columnar file format that originated from Hadoop creator Doug Cutting’s Trevni project. Like RC and ORC, Parquet enjoys compression and query performance benefits, and is generally slower to write than non-columnar file formats. However, unlike RC and ORC files Parquet serdes support limited schema evolution. In Parquet, new columns can be added at the end of the structure. At present, Hive and Impala are able to query newly added columns
This is the aggregated collection from web. Your contributions are most welcome. Please add comments in the comment box below post.
- What is the difference between “SORT BY”, “ORDER BY”, “GROUP BY” , “DISTRIBUTE BY” and “CLUSTER BY” in hive?
- SORT BY sorts data per reducer. It provides ordering of the rows within a reducer. If there are more than one reducer, “sort by” may give partially ordered final results. Hive uses the columns in SORT BY to sort the rows before feeding the rows to a reducer. ( Example: SELECT key, value FROM src SORT BY key ASC, value DESC)
- ORDER BY: guarantees total ordering of data, but for that it has to be passed on to a single reducer, which is normally unacceptable and therefore in strict mode, hive makes it compulsory to use LIMIT with ORDER BY so that reducer doesn’t get overburdened. (Example: SELECT key, value FROM src ORDER BY key ASC, value DESC)
- GROUP BY aggregate records by the specified columns which allows you to perform aggregation functions on non-grouped columns (such as SUM, COUNT, AVG, etc):
- DISTRIBUTE BY: Hive uses the columns in Distribute By to distribute the rows among reducers. All rows with the same Distribute By columns will go to the same reducer. It ensures each of N reducers gets non-overlapping ranges of column, but doesn’t sort the output of each reducer. You end up with N or more unsorted files with non-overlapping ranges. All the records with the same key will go to the same reducer, but there is no guarantee of any ordering. ( i.e. Reducer will receive all x1 key records but may not be in sequence .. x1, x2, x1, x1)
- CLUSTER BY: Combination of DISTRIBUTE BY and SORT BY. Ensures that all records with same key goes to the same reducer and they are sorted. ( i.e. in above example records with x1 keys are guaranteed to be received by same reducer in order … x1, x1, x1, x2)
- If we use the “Limit 1” in any SQL query in Hive, will Reducer work or not.
- Yes, LIMIT does not put constraints on running reducer. Finally one record will be selected randomly, once reducer execution completes.
- Difference between “Internal Table” and “External Table”:
- Internal tables (Also known as Managed tables) are the once which are managed by the hive. When we create a managed table on some data, then data is moved to Hive’s default location i.e. it’s warehouse. If we drop managed hive table, data is deleted from hive store.
- External tables are the once which are managed by the user. Table can be created at any custom location and there is no data movement involved. Consecutively, when you drop such table, no data is deleted, only table schema is dropped.
- Hive Performance Optimization: Some of the rules are:
- Use Partitioning and Bucketing with Data and Domain knowledge
- Avoid or use map side Joins. Make use of parallel execution in hive to completely utilize cluster. Independent queries can run in parallel.
- Use Apache Tez execution engine instead of mapreduce.
- Use ORC File format: ORC supports compressed storage (with ZLIB or SNAPPY) but also uncompressed storage. Referances:
- Use Vectorization: ( Introduced in 0.13)
- Perform query on a batch of records ( e.g. batch size of 1024) rather than on each record.
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
- Does select * query runs mapreduce job?
- No. hive.fetch.task.conversion property of hive lowers the latency of mapreduce overhead and skips mapreduce job for queries like SELECT, FILTER, LIMIT .. etc
- What explode does in hive?
- it converts array of items to rows.
- How can we connect hive from different clients?
- Thrift server : for c++, python…
- Jdbc driver
- odbc driver
- How are the dates stored in hive?
- java.sql.timestamp format
- Wordcount in hive:
select word, count(*) from input lateral view explode (split(text, " ") ltable as word group by word.
create table wordcount (sentence string);
load data local inpath '/data' into table wordcount;
select explode(split(sentence, " ")) as each_word from wordcount;
select each_word, count(*) from wordcount group by each_word;
- Difference between UDF, UDAF and UDTF
- Joins in Hive
Following are the Model evaluation metrics.
- Mean absolute error:
- Problem: Not differentiable (required for gradient descent)
- Mean squared error
- R2 error
- 1 : Perfect Model
- 0 : No learning
- Confusion Matrix: (First Letter indicates Actual and Second Predicted class. e.g. TP means Actual True and we predicted True)
- Accuracy: Total correctly identified out of all
- Accuracy = (TP + TN) / (TP + TN + FN + FP)
- Accuracy is not good when we have skewed data i.e. when one class is more prominent in data then other.
- Precision: Out of all predicted positive, how many are actual positive
- Precision = TP / (TP + FP)
- Example: Spam detection model is High precision model as we are interested in how good are we labelling spam in all spam prediction.
- Recall: Out of all actual positive, how many we have predicted as positive
- Recall = TP / (TP + FN)
- Example: Medical model is High recall model as we are more interested in correctly diagnosing any real sick patient.
- F1 Score: How to combine Precision and Recall into one number.
- Based on Harmonic mean of two numbers.
- F1 Score=2⋅(Precision * Recall) / (Precision + Recall)
- F1 Score is near to smaller number (Precision or recall)
- F-Beta Score: How to handle the situation in F1 Score if we are more concerned about one of Precision or Recall. It is controlled by param called Beta.
- F-Beta Score = ( 1 + B^2) * (Precision * Recall ) / (B^2 * (Precision + Recall))
- Beta = 0 => Precision
- Beta = infinite => Recall
- For other values of β, if they are close to 0, we get something close to precision, if they are large numbers, then we get something close to recall, and if β=1, then we get the harmonic mean of precision and recall.
- ROC Curve (Receiver operating characteristics):
- It is the curve plotted for False positive rate vs True Positive Rate
- False Positive Rate = False Positive / Total positive
- True Positive Rate = True Positive / Total Positive
- Roc area under the curve for a perfect model = 1
- Roc area under the curve for a good model = ~0.8
- Roc area under the curve for a random model = ~0.5
- Grid search is an exhaustive search algorithm, and it searches over all combinations of the parameters we specify to find the optimum combination that yields the best performance.
- Grid search is the technique used to find the best parameters for a model or the best model. In this technique we specify all possible values we wants to test as parameters of the model ( for example different values of depth in case of tree based model or different kernels in case of svm) and then algorithm evaluate all possible combination of the hyperparameters and finds the best hyperparameters.
- Due to its exhaustive search nature, grid search can be computationally expensive, especially when data size is large and model is complicated. Sometimes we resort to randomized search in this case to search only some combinations of the parameters.
- Randomized Search:
- K-fold cross validation:
- n K-fold validation technique, we split our training data into k parts. Now we traing model on k-1 parts and validate on kth part and keep on doing so making each of the split as kth part for validation. In each iteration we record error. An average of all k rounds of error is what is known as k fold cross validation error.
- This method of evaluation is good for optimization as model is seeing all the training data for learning. at the same time is validating model on other part. We should aim to reduce cross validation error (average error of all k iterations) to get best model.
- How to decide Value of K:
The notebook is a web application that allows you to combine explanatory text, math equations, code, and visualizations all in one easily sharable document.
conda install jupyter notebook
Launching Notebook: Use the below command to launch notebook app. This will open a web browser at http://localhost:8888 (default settings)