Yeni bir maven projesi oluşturuyoruz.
pom.xml:
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0</modelversion>
<groupid>com.turkninja</groupid>
<artifactid>flink-movie-example</artifactid>
<version>1.0-SNAPSHOT</version>
<properties>
<java .version="">1.8</java>
<flink .version="">1.8.0</flink>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java -->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-java</artifactid>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-clients_2.11</artifactid>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
Programı çalıştıracağımız main methodunun ve Movie classının bulunduğu sınıf
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* @author ali turgut bozkurt
* Created at 5/29/2019
*/
public class FilterMovie {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<tuple3 ong="" string="">> lines = env.readCsvFile("src/main/resources/ml-latest-small/movies.csv")
.ignoreFirstLine()
.parseQuotedStrings('"')
.ignoreInvalidLines()
.types(Long.class, String.class, String.class);
DataSet<movie> movies = lines.map(new MapFunction<tuple3 ong="" string="">, Movie>() {
public Movie map(Tuple3<long string=""> movieLine) throws Exception {
String name = movieLine.f1;
String[] genres = movieLine.f2.split("\\|");
return new Movie(name, new HashSet<string>(Arrays.asList(genres)));
}
});
movies = movies.filter(new FilterFunction<movie>() {
public boolean filter(Movie movie) throws Exception {
return movie.getGenres().contains("Action");
}
});
System.out.println(movies.count());
movies.print();
}
public static class Movie {
private String name;
private Set<string> genres;
public String getName() {
return name;
}
public Movie(String name, Set<string> genres) {
this.name = name;
this.genres = genres;
}
public Set<string> getGenres() {
return genres;
}
@Override
public String toString() {
return "Movie{" +
"name='" + name + '\'' +
", genres=" + genres +
'}';
}
}
}
Datasetimizi buradan indiriyouruz.zip dosyasını açıp src/main/resources altına açıyoruz.
Uygulamayı çalıştırdığımızda datasetimizde bulunan tüm action türünde filmlerin sayısını ve listesini ekranda görebiliriz.
1828
Movie{name='Jet Li's Fearless (Huo Yuan Jia) (2006)', genres=[Action, Drama]} Movie{name='Fast and the Furious: Tokyo Drift, The (Fast and the Furious 3, The) (2006)', genres=[Action, Drama, Thriller, Crime]} Movie{name='Superman Returns (2006)', genres=[Action, Sci-Fi, Adventure, IMAX]} Movie{name='Army of Shadows (L'armée des ombres) (1969)', genres=[Action, Drama, Thriller, War]} Movie{name='Snakes on a Plane (2006)', genres=[Action, Horror, Thriller, Comedy]} Movie{name='Talladega Nights: The Ballad of Ricky Bobby (2006)', genres=[Action, Comedy]} Movie{name='Night at the Museum (2006)', genres=[Action, Fantasy, IMAX, Comedy]} Movie{name='Miami Vice (2006)', genres=[Action, Drama, Thriller, Crime]} Movie{name='Crank (2006)', genres=[Action, Thriller]} Movie{name='Chaos (2005)', genres=[Action, Drama, Thriller, Crime]} Movie{name='Crime Busters (1977)', genres=[Action, Adventure, Crime, Comedy]} Movie{name='Covenant, The (2006)', genres=[Action, Horror, Thriller]} Movie{name='Flyboys (2006)', genres=[Action, Adventure, Drama, War]} Movie{name='Guardian, The (2006)', genres=[Action, Adventure, Drama]} Movie{name='Marine, The (2006)', genres=[Action, Drama, Thriller]} Movie{name='Feast (2005)', genres=[Action, Horror, Thriller, Comedy]} Movie{name='Children of Men (2006)', genres=[Action, Sci-Fi, Adventure, Drama, Thriller]} Movie{name='Casino Royale (2006)', genres=[Action, Adventure, Thriller]} Movie{name='Déjà Vu (Deja Vu) (2006)', genres=[Action, Sci-Fi, Thriller]} Movie{name='Harsh Times (2006)', genres=[Action, Drama, Crime]} Movie{name='Blood Diamond (2006)', genres=[Action, Adventure, Drama, Thriller, Crime, War]} Movie{name='Eragon (2006)', genres=[Action, Adventure, Fantasy]} Movie{name='Rocky Balboa (2006)', genres=[Action, Drama]} Movie{name='Dam Busters, The (1955)', genres=[Action, Drama, War]} Movie{name='DOA: Dead or Alive (2006)', genres=[Action, Adventure]} Movie{name='Curse of the Golden Flower (Man cheng jin dai huang jin jia) (2006)', genres=[Action, Drama]} Movie{name='Black Christmas (2006)', genres=[Action, Horror, Thriller]} ..... ..... ..... Movie{name='Transformers: Age of Extinction (2014)', genres=[Action, Sci-Fi, Adventure]} Movie{name='Purge: Anarchy, The (2014)', genres=[Action, Horror, Thriller]} Movie{name='Guardians of the Galaxy (2014)', genres=[Action, Sci-Fi, Adventure]} Movie{name='The Expendables 3 (2014)', genres=[Action, Adventure]} Movie{name='Hercules (2014)', genres=[Action, Adventure]} Movie{name='Batman: Assault on Arkham (2014)', genres=[Action, Thriller, Crime, Animation]} Movie{name='Jupiter Ascending (2015)', genres=[Action, Sci-Fi, Adventure]} Movie{name='Teenage Mutant Ninja Turtles (2014)', genres=[Action, Adventure, Comedy]} Movie{name='Revenge of the Green Dragons (2014)', genres=[Action, Drama, Crime]} Movie{name='Sin City: A Dame to Kill For (2014)', genres=[Action, Thriller, Crime]} Movie{name='Maze Runner, The (2014)', genres=[Action, Sci-Fi, Mystery]} Movie{name='Walk Among the Tombstones, A (2014)', genres=[Action, Thriller, Crime, Mystery]}
Kaynak : pluralsight
