Yeni bir maven projesi oluşturuyoruz.
pom.xml:
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>com.turkninja</groupid> <artifactid>flink-movie-example</artifactid> <version>1.0-SNAPSHOT</version> <properties> <java .version="">1.8</java> <flink .version="">1.8.0</flink> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java --> <dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-java</artifactid> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-clients_2.11</artifactid> <version>${flink.version}</version> </dependency> </dependencies> </project>
Programı çalıştıracağımız main methodunun ve Movie classının bulunduğu sınıf
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple3; import java.util.Arrays; import java.util.HashSet; import java.util.Set; /** * @author ali turgut bozkurt * Created at 5/29/2019 */ public class FilterMovie { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<tuple3 ong="" string="">> lines = env.readCsvFile("src/main/resources/ml-latest-small/movies.csv") .ignoreFirstLine() .parseQuotedStrings('"') .ignoreInvalidLines() .types(Long.class, String.class, String.class); DataSet<movie> movies = lines.map(new MapFunction<tuple3 ong="" string="">, Movie>() { public Movie map(Tuple3<long string=""> movieLine) throws Exception { String name = movieLine.f1; String[] genres = movieLine.f2.split("\\|"); return new Movie(name, new HashSet<string>(Arrays.asList(genres))); } }); movies = movies.filter(new FilterFunction<movie>() { public boolean filter(Movie movie) throws Exception { return movie.getGenres().contains("Action"); } }); System.out.println(movies.count()); movies.print(); } public static class Movie { private String name; private Set<string> genres; public String getName() { return name; } public Movie(String name, Set<string> genres) { this.name = name; this.genres = genres; } public Set<string> getGenres() { return genres; } @Override public String toString() { return "Movie{" + "name='" + name + '\'' + ", genres=" + genres + '}'; } } }Datasetimizi buradan indiriyouruz.
zip dosyasını açıp src/main/resources altına açıyoruz.
Uygulamayı çalıştırdığımızda datasetimizde bulunan tüm action türünde filmlerin sayısını ve listesini ekranda görebiliriz.
1828
Movie{name='Jet Li's Fearless (Huo Yuan Jia) (2006)', genres=[Action, Drama]} Movie{name='Fast and the Furious: Tokyo Drift, The (Fast and the Furious 3, The) (2006)', genres=[Action, Drama, Thriller, Crime]} Movie{name='Superman Returns (2006)', genres=[Action, Sci-Fi, Adventure, IMAX]} Movie{name='Army of Shadows (L'armée des ombres) (1969)', genres=[Action, Drama, Thriller, War]} Movie{name='Snakes on a Plane (2006)', genres=[Action, Horror, Thriller, Comedy]} Movie{name='Talladega Nights: The Ballad of Ricky Bobby (2006)', genres=[Action, Comedy]} Movie{name='Night at the Museum (2006)', genres=[Action, Fantasy, IMAX, Comedy]} Movie{name='Miami Vice (2006)', genres=[Action, Drama, Thriller, Crime]} Movie{name='Crank (2006)', genres=[Action, Thriller]} Movie{name='Chaos (2005)', genres=[Action, Drama, Thriller, Crime]} Movie{name='Crime Busters (1977)', genres=[Action, Adventure, Crime, Comedy]} Movie{name='Covenant, The (2006)', genres=[Action, Horror, Thriller]} Movie{name='Flyboys (2006)', genres=[Action, Adventure, Drama, War]} Movie{name='Guardian, The (2006)', genres=[Action, Adventure, Drama]} Movie{name='Marine, The (2006)', genres=[Action, Drama, Thriller]} Movie{name='Feast (2005)', genres=[Action, Horror, Thriller, Comedy]} Movie{name='Children of Men (2006)', genres=[Action, Sci-Fi, Adventure, Drama, Thriller]} Movie{name='Casino Royale (2006)', genres=[Action, Adventure, Thriller]} Movie{name='Déjà Vu (Deja Vu) (2006)', genres=[Action, Sci-Fi, Thriller]} Movie{name='Harsh Times (2006)', genres=[Action, Drama, Crime]} Movie{name='Blood Diamond (2006)', genres=[Action, Adventure, Drama, Thriller, Crime, War]} Movie{name='Eragon (2006)', genres=[Action, Adventure, Fantasy]} Movie{name='Rocky Balboa (2006)', genres=[Action, Drama]} Movie{name='Dam Busters, The (1955)', genres=[Action, Drama, War]} Movie{name='DOA: Dead or Alive (2006)', genres=[Action, Adventure]} Movie{name='Curse of the Golden Flower (Man cheng jin dai huang jin jia) (2006)', genres=[Action, Drama]} Movie{name='Black Christmas (2006)', genres=[Action, Horror, Thriller]} ..... ..... ..... Movie{name='Transformers: Age of Extinction (2014)', genres=[Action, Sci-Fi, Adventure]} Movie{name='Purge: Anarchy, The (2014)', genres=[Action, Horror, Thriller]} Movie{name='Guardians of the Galaxy (2014)', genres=[Action, Sci-Fi, Adventure]} Movie{name='The Expendables 3 (2014)', genres=[Action, Adventure]} Movie{name='Hercules (2014)', genres=[Action, Adventure]} Movie{name='Batman: Assault on Arkham (2014)', genres=[Action, Thriller, Crime, Animation]} Movie{name='Jupiter Ascending (2015)', genres=[Action, Sci-Fi, Adventure]} Movie{name='Teenage Mutant Ninja Turtles (2014)', genres=[Action, Adventure, Comedy]} Movie{name='Revenge of the Green Dragons (2014)', genres=[Action, Drama, Crime]} Movie{name='Sin City: A Dame to Kill For (2014)', genres=[Action, Thriller, Crime]} Movie{name='Maze Runner, The (2014)', genres=[Action, Sci-Fi, Mystery]} Movie{name='Walk Among the Tombstones, A (2014)', genres=[Action, Thriller, Crime, Mystery]}
Kaynak : pluralsight